You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/03/25 19:39:43 UTC

svn commit: r758397 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/queue/ test/java/org/apache/qpid/server/ test/java/org/apache/qpid/server/exchange/ test/java/org/apache/qpid/server/filter/ test/java/org/apache/qpid/server/qu...

Author: ritchiem
Date: Wed Mar 25 18:39:25 2009
New Revision: 758397

URL: http://svn.apache.org/viewvc?rev=758397&view=rev
Log:
QPID-1735 : Added Documentation to QueueBackingStore around thread safety of load/unload, Updated FQBS to adhere to the thread safety specified by the interface.  QueueEntry was updated to return the AMQMessage from the load() to complete the getMessage() interface. As in a flowed state the message may be purged before a reference can be taken. Added new Test QueueEntryImplThreadingTest that should later be run for longer but aims to show that load always returns the message even when unloads are occuring asynchronously.

    Commit from 0.5-release : r758388


Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java Wed Mar 25 18:39:25 2009
@@ -157,10 +157,10 @@
         {
             try
             {
-                input.close();
-                // We can purge the message here then reflow it if required but I believe it to be cleaner to leave it
-                // on disk until it has been deleted from the queue at that point we can be sure we won't need the data
-                //handle.delete();
+                if (input != null)
+                {
+                    input.close();
+                }
             }
             catch (IOException e)
             {
@@ -171,101 +171,123 @@
         throw new UnableToRecoverMessageException(error);
     }
 
+    /**
+     * Thread safety is ensured here by synchronizing on the message object.
+     *
+     * This is safe as load() calls will fail until the first thread through here has created the file on disk
+     * and fully written the content.
+     *
+     * After this point new AMQMessages can exist that reference the same data thus breaking the synchronisation.
+     *
+     * Thread safety is maintained here as the existence of the file is checked allowing then subsequent unload() calls
+     * to skip the writing.
+     *
+     * Multiple unload() calls will initially be blocked using the synchronization until the data exists on disk thus
+     * safely allowing any reference to the message to be cleared prompting a load call.
+     *
+     * @param message the message to unload
+     * @throws UnableToFlowMessageException
+     */
     public void unload(AMQMessage message) throws UnableToFlowMessageException
     {
-        long messageId = message.getMessageId();
+        //Synchorize on the message to ensure that one only thread can unload at a time.
+        // If a second unload is attempted then it will block until the unload has completed.
+        synchronized (message)
+        {
+            long messageId = message.getMessageId();
 
-        File handle = getFileHandle(messageId);
+            File handle = getFileHandle(messageId);
 
-        //If we have written the data once then we don't need to do it again.
-        if (handle.exists())
-        {
-            if (_log.isDebugEnabled())
+            //If we have written the data once then we don't need to do it again.
+            if (handle.exists())
             {
-                _log.debug("Message(ID:" + messageId + ") already unloaded.");
+                if (_log.isDebugEnabled())
+                {
+                    _log.debug("Message(ID:" + messageId + ") already unloaded.");
+                }
+                return;
             }
-            return;
-        }
 
-        if (_log.isInfoEnabled())
-        {
-            _log.info("Unloading Message (ID:" + messageId + ")");
-        }
-
-        ObjectOutputStream writer = null;
-        Exception error = null;
+            if (_log.isInfoEnabled())
+            {
+                _log.info("Unloading Message (ID:" + messageId + ")");
+            }
 
-        try
-        {
-            writer = new ObjectOutputStream(new FileOutputStream(handle));
+            ObjectOutputStream writer = null;
+            Exception error = null;
 
-            writer.writeLong(message.getArrivalTime());
+            try
+            {
+                writer = new ObjectOutputStream(new FileOutputStream(handle));
 
-            MessagePublishInfo mpi = message.getMessagePublishInfo();
-            writer.writeUTF(String.valueOf(mpi.getExchange()));
-            writer.writeUTF(String.valueOf(mpi.getRoutingKey()));
-            writer.writeBoolean(mpi.isMandatory());
-            writer.writeBoolean(mpi.isImmediate());
-            ContentHeaderBody chb = message.getContentHeaderBody();
+                writer.writeLong(message.getArrivalTime());
 
-            // write out the content header body
-            final int bodySize = chb.getSize();
-            byte[] underlying = new byte[bodySize];
-            ByteBuffer buf = ByteBuffer.wrap(underlying);
-            chb.writePayload(buf);
+                MessagePublishInfo mpi = message.getMessagePublishInfo();
+                writer.writeUTF(String.valueOf(mpi.getExchange()));
+                writer.writeUTF(String.valueOf(mpi.getRoutingKey()));
+                writer.writeBoolean(mpi.isMandatory());
+                writer.writeBoolean(mpi.isImmediate());
+                ContentHeaderBody chb = message.getContentHeaderBody();
+
+                // write out the content header body
+                final int bodySize = chb.getSize();
+                byte[] underlying = new byte[bodySize];
+                ByteBuffer buf = ByteBuffer.wrap(underlying);
+                chb.writePayload(buf);
 
-            writer.writeInt(bodySize);
-            writer.write(underlying, 0, bodySize);
+                writer.writeInt(bodySize);
+                writer.write(underlying, 0, bodySize);
 
-            int bodyCount = message.getBodyCount();
-            writer.writeInt(bodyCount);
+                int bodyCount = message.getBodyCount();
+                writer.writeInt(bodyCount);
 
-            //WriteContentBody
-            for (int index = 0; index < bodyCount; index++)
-            {
-                ContentChunk chunk = message.getContentChunk(index);
-                int length = chunk.getSize();
+                //WriteContentBody
+                for (int index = 0; index < bodyCount; index++)
+                {
+                    ContentChunk chunk = message.getContentChunk(index);
+                    int length = chunk.getSize();
 
-                byte[] chunk_underlying = new byte[length];
+                    byte[] chunk_underlying = new byte[length];
 
-                ByteBuffer chunk_buf = chunk.getData();
+                    ByteBuffer chunk_buf = chunk.getData();
 
-                chunk_buf.duplicate().rewind().get(chunk_underlying);
+                    chunk_buf.duplicate().rewind().get(chunk_underlying);
 
-                writer.writeInt(length);
-                writer.write(chunk_underlying, 0, length);
+                    writer.writeInt(length);
+                    writer.write(chunk_underlying, 0, length);
+                }
             }
-        }
-        catch (FileNotFoundException e)
-        {
-            error = e;
-        }
-        catch (IOException e)
-        {
-            error = e;
-        }
-        finally
-        {
-            // In a FileNotFound situation writer will be null.
-            if (writer != null)
+            catch (FileNotFoundException e)
             {
-                try
-                {
-                    writer.flush();
-                    writer.close();
-                }
-                catch (IOException e)
+                error = e;
+            }
+            catch (IOException e)
+            {
+                error = e;
+            }
+            finally
+            {
+                // In a FileNotFound situation writer will be null.
+                if (writer != null)
                 {
-                    error = e;
+                    try
+                    {
+                        writer.flush();
+                        writer.close();
+                    }
+                    catch (IOException e)
+                    {
+                        error = e;
+                    }
                 }
             }
-        }
 
-        if (error != null)
-        {
-            _log.error("Unable to unload message(" + messageId + ") to disk, restoring state.");
-            handle.delete();
-            throw new UnableToFlowMessageException(messageId, error);
+            if (error != null)
+            {
+                _log.error("Unable to unload message(" + messageId + ") to disk, restoring state.");
+                handle.delete();
+                throw new UnableToFlowMessageException(messageId, error);
+            }
         }
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java Wed Mar 25 18:39:25 2009
@@ -166,7 +166,7 @@
     {
         // If we've increased the minimum memory above what we have in memory then
         // we need to inhale more if there is more
-        if (_atomicQueueInMemory.get() < _memoryUsageMinimum && _atomicQueueSize.get() > 0)
+        if (!_disabled && _atomicQueueInMemory.get() < _memoryUsageMinimum && _atomicQueueSize.get() > 0)
         {
             startInhaler();
         }
@@ -204,7 +204,7 @@
      */
     public void entryUnloadedUpdateMemory(QueueEntry queueEntry)
     {
-        if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+        if (!_disabled && _atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
         {
             _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity());
         }
@@ -219,7 +219,7 @@
      */
     public void entryLoadedUpdateMemory(QueueEntry queueEntry)
     {
-        if (_atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
+        if (!_disabled && _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
         {
             _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum);
             setFlowed(true);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java Wed Mar 25 18:39:25 2009
@@ -26,8 +26,36 @@
 
 public interface QueueBackingStore
 {
+    /**
+     * Retrieve the message with a given ID
+     *
+     * This method must be thread safe.
+     *
+     * Multiple calls to load with a given messageId DO NOT need to return the same object.
+     *
+     * @param messageId the id of the message to retreive.
+     * @return
+     */
     AMQMessage load(Long messageId);
 
+    /**
+     * Store a message in the BackingStore.
+     *
+     * This method must be thread safe understanding that multiple message objects may be the same data.
+     *
+     * Allowing a thread to return from this method means that it is safe to call load()
+     *
+     * Implementer guide:
+     * Until the message has been loaded the message references will all be the same object.
+     *
+     * One appraoch as taken by the @see FileQueueBackingStore is to block aimulataneous calls to this method 
+     * until the message is fully on disk. This can be done by synchronising on message as initially it is always the
+     * same object. Only after a load has taken place will there be a discrepency.
+     *
+     *
+     * @param message the message to unload
+     * @throws UnableToFlowMessageException
+     */
     void unload(AMQMessage message) throws UnableToFlowMessageException;
 
     void delete(Long messageId);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Wed Mar 25 18:39:25 2009
@@ -226,7 +226,7 @@
 
     void unload();
 
-    void load();
+    AMQMessage load();
 
     boolean isFlowed();
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Wed Mar 25 18:39:25 2009
@@ -31,6 +31,7 @@
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 public class QueueEntryImpl implements QueueEntry
@@ -41,7 +42,7 @@
 
     private final SimpleQueueEntryList _queueEntryList;
 
-    private AMQMessage _message;
+    private AtomicReference<AMQMessage> _messageRef;
 
     private boolean _redelivered;
 
@@ -102,7 +103,7 @@
     public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message)
     {
         _queueEntryList = queueEntryList;
-        _message = message;
+        _messageRef = new AtomicReference<AMQMessage>(message);
         if (message != null)
         {
             _messageId = message.getMessageId();
@@ -136,11 +137,7 @@
 
     public AMQMessage getMessage()
     {
-        if (_message == null)
-        {
-            return _backingStore.load(_messageId);
-        }
-        return _message;
+        return load();
     }
 
     public Long getMessageId()
@@ -231,13 +228,17 @@
     public String debugIdentity()
     {
         String entry = "[State:" + _state.getState().name() + "]";
-        if (_message == null)
+
+        AMQMessage message = _messageRef.get();
+
+        if (message == null)
         {
             return entry + "(Message Unloaded ID:" + _messageId + ")";
         }
         else
         {
-            return entry + _message.debugIdentity();
+
+            return entry + message.debugIdentity();
         }
     }
 
@@ -398,23 +399,27 @@
 
     public void unload()
     {
-        if (_message != null && _backingStore != null)
-        {
+        //Get the currently loaded message
+        AMQMessage message = _messageRef.get();
 
+        // If we have a message in memory and we have a valid backingStore attempt to unload
+        if (message != null && _backingStore != null)
+        {
             try
             {
-                if (!_hasBeenUnloaded)
+                // The backingStore will now handle concurrent calls to unload and safely synchronize to ensure
+                // multiple initial unloads are unloads
+                _backingStore.unload(message);
+                _hasBeenUnloaded = true;
+                _messageRef.set(null);
+
+                if (_log.isDebugEnabled())
                 {
-                    _hasBeenUnloaded = true;
+                    _log.debug("Unloaded:" + debugIdentity());
+                }
 
-                    _backingStore.unload(_message);
 
-                    if (_log.isDebugEnabled())
-                    {
-                        _log.debug("Unloaded:" + debugIdentity());
-                    }
-                }
-                _message = null;
+                // Clear the message reference if the loaded message is still the one we are processing.
 
                 //Update the memoryState if this load call resulted in the message being purged from memory
                 if (!_flowed.getAndSet(true))
@@ -434,23 +439,56 @@
         }
     }
 
-    public void load()
+    public AMQMessage load()
     {
+        // MessageId and Backing store are null in test scenarios, normally this is not the case.
         if (_messageId != null && _backingStore != null)
         {
-            _message = _backingStore.load(_messageId);
+            // See if we have the message currently in memory to return
+            AMQMessage message = _messageRef.get();
+            // if we don't then we need to start a load process.
+            if (message == null)
+            {
+                //Synchronize here to ensure only the first thread that attempts to load will perform the load from the
+                // backing store.
+                synchronized (this)
+                {
+                    // Check again to see if someone else ahead of us loaded the message
+                    message = _messageRef.get();
+                    // if we still don't have the message then we need to start a load process.
+                    if (message == null)
+                    {
+                        // Load the message and keep a reference to it
+                        message = _backingStore.load(_messageId);
+                        // Set the message reference
+                        _messageRef.set(message);
+                    }
+                    else
+                    {
+                        // If someone else loaded the message then we can jump out here as the Memory Updates will
+                        // have been performed by the loading thread
+                        return message;
+                    }
+                }
 
-            if (_log.isDebugEnabled())
-            {
-                _log.debug("Loaded:" + debugIdentity());
-            }
+                if (_log.isDebugEnabled())
+                {
+                    _log.debug("Loaded:" + debugIdentity());
+                }
 
-            //Update the memoryState if this load call resulted in the message comming in to memory
-            if (_flowed.getAndSet(false))
-            {
-                _queueEntryList.entryLoadedUpdateMemory(this);
+                //Update the memoryState if this load call resulted in the message comming in to memory
+                if (_flowed.getAndSet(false))
+                {
+                    _queueEntryList.entryLoadedUpdateMemory(this);
+                }
             }
+
+            // Return the message that was either already in memory or the value we just loaded.
+            return message;
         }
+        // This can be null but only in the case where we have no messageId
+        // in the case where we have no backingStore then we will never have unloaded the message
+        return _messageRef.get();
     }
 
     public boolean isFlowed()

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java Wed Mar 25 18:39:25 2009
@@ -32,6 +32,7 @@
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.MockSubscription;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.AMQException;
 
 import java.util.Map;
@@ -96,6 +97,12 @@
         assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size());
     }
 
+    public void tearDown() throws Exception
+    {
+        //Ensure we close the registry that the MockAMQQueue will create
+        ApplicationRegistry.getInstance().close();
+    }
+
     /**
      * Helper method to create a new subscription and aquire the given messages.
      *

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Wed Mar 25 18:39:25 2009
@@ -36,11 +36,9 @@
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.FailedDequeueException;
 import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageCleanupException;
 import org.apache.qpid.server.queue.MockProtocolSession;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.SimpleAMQQueue;
-import org.apache.qpid.server.queue.UnableToFlowMessageException;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.store.SkeletonMessageStore;
@@ -356,9 +354,9 @@
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void load()
+                public AMQMessage load()
                 {
-                    //To change body of implemented methods use File | Settings | File Templates.
+                    return null;
                 }
 
                 public boolean isFlowed()

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java Wed Mar 25 18:39:25 2009
@@ -23,10 +23,18 @@
 import junit.framework.TestCase;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.MockQueueEntry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 
 public class PropertyExpressionTest extends TestCase
 {
 
+    public void tearDown() throws Exception
+    {
+        //Ensure we close the registry that the MockQueueEntry will create
+        ApplicationRegistry.remove(1);
+    }
+
+
     public void testJMSRedelivered()
     {
         PropertyExpression<AMQException> pe = new PropertyExpression<AMQException>("JMSRedelivered");

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java Wed Mar 25 18:39:25 2009
@@ -23,6 +23,14 @@
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl;
+
+import java.util.LinkedList;
+import java.util.ArrayList;
 
 public class MockAMQMessage extends TransientAMQMessage
 {
@@ -31,6 +39,14 @@
     {
        super(messageId);
         _messagePublishInfo = new MessagePublishInfoImpl(null,false,false,null);
+        BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+
+        properties.setMessageId(String.valueOf(messageId));
+        properties.setTimestamp(System.currentTimeMillis());
+        properties.setDeliveryMode((byte)1);
+
+        _contentHeaderBody = new ContentHeaderBody(properties, BasicPublishBodyImpl.CLASS_ID);
+        _contentBodies = new ArrayList<ContentChunk>();
     }
 
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Wed Mar 25 18:39:25 2009
@@ -20,16 +20,18 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.AMQException;
-import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 
 import java.util.List;
 import java.util.Set;
@@ -39,10 +41,20 @@
     private boolean _deleted = false;
     private int _queueCount;
     private AMQShortString _name;
+    private VirtualHost _virtualhost;
 
     public MockAMQQueue(String name)
     {
-       _name = new AMQShortString(name);
+        _name = new AMQShortString(name);
+        _virtualhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
+        try
+        {
+            _virtualhost.getQueueRegistry().registerQueue(this);
+        }
+        catch (AMQException e)
+        {
+            e.printStackTrace(); 
+        }
     }
 
     public AMQShortString getName()
@@ -67,7 +79,7 @@
 
     public VirtualHost getVirtualHost()
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return _virtualhost;
     }
 
     public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
@@ -152,7 +164,7 @@
 
     public int delete() throws AMQException
     {
-       return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
     public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
@@ -343,7 +355,7 @@
 
     public void setMinimumAlertRepeatGap(long value)
     {
-        
+
     }
 
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java Wed Mar 25 18:39:25 2009
@@ -40,6 +40,7 @@
 import org.apache.qpid.server.queue.MockAMQQueue;
 import org.apache.qpid.server.queue.MockProtocolSession;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 
 public class ACLManagerTest extends TestCase
 {
@@ -67,6 +68,12 @@
         
         _session = new MockProtocolSession(new TestableMemoryMessageStore());
     }
+
+    public void tearDown() throws Exception
+    {
+        //Ensure we close the registry that the MockAMQQueue will create
+        ApplicationRegistry.getInstance().close();
+    }    
     
     public void testACLManagerConfigurationPluginManager() throws Exception
     {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org