You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/03/11 00:11:10 UTC

svn commit: r752300 [8/12] - in /qpid/branches/qpid-1673/qpid: cpp/ cpp/examples/ cpp/examples/direct/ cpp/examples/failover/ cpp/examples/fanout/ cpp/examples/pub-sub/ cpp/examples/qmf-console/ cpp/examples/request-response/ cpp/examples/tradedemo/ cp...

Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java Tue Mar 10 23:10:57 2009
@@ -29,8 +29,11 @@
 
 public class AMQQueueFactoryTest extends TestCase
 {
+    final int MAX_SIZE = 50;
+
     QueueRegistry _queueRegistry;
     VirtualHost _virtualHost;
+    protected FieldTable _arguments;
 
     public void setUp()
     {
@@ -41,6 +44,15 @@
         _queueRegistry = _virtualHost.getQueueRegistry();
 
         assertEquals("Queues registered on an empty virtualhost", 0, _queueRegistry.getQueues().size());
+
+
+        _arguments = new FieldTable();
+
+        //Ensure we can call createQueue with a priority int value
+        _arguments.put(AMQQueueFactory.QPID_POLICY_TYPE, AMQQueueFactory.QPID_FLOW_TO_DISK);
+        // each message in the QBAAT is around 9-10 bytes each so only give space for half
+
+        _arguments.put(AMQQueueFactory.QPID_MAX_SIZE, MAX_SIZE);
     }
 
     public void tearDown()
@@ -50,17 +62,19 @@
     }
 
 
-    public void testPriorityQueueRegistration()
+    protected AMQQueue createQueue() throws AMQException
     {
-        FieldTable fieldTable = new FieldTable();
-        fieldTable.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 5);
+        return AMQQueueFactory.createAMQQueueImpl(new AMQShortString(this.getName()), false, new AMQShortString("owner"), false,
+                                               _virtualHost, _arguments);
+    }
 
+
+    public void testQueueRegistration()
+    {
         try
         {
-            AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testPriorityQueue"), false, new AMQShortString("owner"), false,
-                                               _virtualHost, fieldTable);
-
-            assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());            
+            AMQQueue queue = createQueue();
+            assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
         }
         catch (AMQException e)
         {
@@ -68,18 +82,20 @@
         }
     }
 
-
-    public void testSimpleQueueRegistration()
+    public void testQueueValuesAfterCreation()
     {
         try
         {
-            AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false,
-                                               _virtualHost, null);
-            assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
+            AMQQueue queue = createQueue();
+
+            assertEquals("MemoryMaximumSize not set correctly:", MAX_SIZE, queue.getMemoryUsageMaximum());
+            assertEquals("MemoryMinimumSize not defaulted to half maximum:", MAX_SIZE / 2, queue.getMemoryUsageMinimum());
+
         }
         catch (AMQException e)
         {
             fail(e.getMessage());
         }
     }
+
 }

Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Tue Mar 10 23:10:57 2009
@@ -73,7 +73,7 @@
         sendMessages(messageCount, false);
         assertTrue(_queueMBean.getMessageCount() == messageCount);
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
-        long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+        long queueDepth = (messageCount * MESSAGE_SIZE);
         assertTrue(_queueMBean.getQueueDepth() == queueDepth);
 
         _queueMBean.deleteMessageFromTop();
@@ -94,7 +94,7 @@
         sendMessages(messageCount, true);
         assertEquals("", messageCount, _queueMBean.getMessageCount().intValue());
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
-        long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+        long queueDepth = (messageCount * MESSAGE_SIZE);
         assertTrue(_queueMBean.getQueueDepth() == queueDepth);
 
         _queueMBean.deleteMessageFromTop();
@@ -175,7 +175,7 @@
 
         assertTrue(_queueMBean.getMaximumMessageCount() == 50000);
         assertTrue(_queueMBean.getMaximumMessageSize() == 2000);
-        assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth >> 10));
+        assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth));
 
         assertTrue(_queueMBean.getName().equals("testQueue"));
         assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest"));

Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java Tue Mar 10 23:10:57 2009
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
 
 public class MockAMQMessage extends TransientAMQMessage
 {
@@ -29,6 +30,7 @@
             throws AMQException
     {
        super(messageId);
+        _messagePublishInfo = new MessagePublishInfoImpl(null,false,false,null);
     }
 
 

Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Tue Mar 10 23:10:57 2009
@@ -115,6 +115,11 @@
         return false;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    public boolean isFlowed()
+    {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public int getMessageCount()
     {
         return 0;  //To change body of implemented methods use File | Settings | File Templates.
@@ -216,6 +221,26 @@
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    public long getMemoryUsageMaximum()
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void setMemoryUsageMaximum(long maximumMemoryUsage)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public long getMemoryUsageMinimum()
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void setMemoryUsageMinimum(long minimumMemoryUsage)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public long getMaximumMessageSize()
     {
         return 0;  //To change body of implemented methods use File | Settings | File Templates.
@@ -271,7 +296,6 @@
         return 0;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    @Override
     public void checkMessageStatus() throws AMQException
     {
         //To change body of implemented methods use File | Settings | File Templates.
@@ -302,6 +326,11 @@
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    public long getMemoryUsageCurrent()
+    {
+        return 0;
+    }
+
     public ManagedObject getManagedObject()
     {
         return null;  //To change body of implemented methods use File | Settings | File Templates.
@@ -312,7 +341,6 @@
         return 0;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    @Override
     public void setMinimumAlertRepeatGap(long value)
     {
         

Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java Tue Mar 10 23:10:57 2009
@@ -21,16 +21,25 @@
 package org.apache.qpid.server.queue;
 
 import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 public class QueueEntryImplTest extends TestCase
 {
 
-    /**
-     * Test the Redelivered state of a QueueEntryImpl
-     */
+    /** Test the Redelivered state of a QueueEntryImpl */
     public void testRedelivered()
     {
-        QueueEntry entry = new QueueEntryImpl(null, null);
+        QueueEntry entry = new MockQueueEntry(null);
 
         assertFalse("New message should not be redelivered", entry.isRedelivered());
 
@@ -45,5 +54,187 @@
 
     }
 
+    public void testImmediateAndNotDelivered()
+    {
+        AMQMessage message = MessageFactory.getInstance().createMessage(null, false);
+
+        MessagePublishInfo mpi = new MessagePublishInfoImpl(null, true, false, null);
+        int bodySize = 0;
+
+        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+        props.setAppId("HandleTest");
+
+        ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+        try
+        {
+            message.setPublishAndContentHeaderBody(null, mpi, chb);
+
+            QueueEntry queueEntry = new MockQueueEntry(message);
+
+            assertTrue("Undelivered Immediate message should still be marked as so", queueEntry.immediateAndNotDelivered());
+
+            assertFalse("Undelivered Message should not say it is delivered.", queueEntry.getDeliveredToConsumer());
+
+            queueEntry.setDeliveredToConsumer();
+
+            assertTrue("Delivered Message should say it is delivered.", queueEntry.getDeliveredToConsumer());
+
+            assertFalse("Delivered Immediate message now be marked as so", queueEntry.immediateAndNotDelivered());
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+    }
+
+    public void testNotImmediateAndNotDelivered()
+    {
+        AMQMessage message = MessageFactory.getInstance().createMessage(null, false);
+
+        MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+        int bodySize = 0;
+
+        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+        props.setAppId("HandleTest");
+
+        ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+        try
+        {
+            message.setPublishAndContentHeaderBody(null, mpi, chb);
+
+            QueueEntry queueEntry = new MockQueueEntry(message);
+
+            assertFalse("Undelivered Non-Immediate message should not result in true.", queueEntry.immediateAndNotDelivered());
+
+            assertFalse("Undelivered Message should not say it is delivered.", queueEntry.getDeliveredToConsumer());
+
+            queueEntry.setDeliveredToConsumer();
+
+            assertTrue("Delivered Message should say it is delivered.", queueEntry.getDeliveredToConsumer());
+
+            assertFalse("Delivered Non-Immediate message not change this return", queueEntry.immediateAndNotDelivered());
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+    }
+
+    public void testExpiry()
+    {
+        AMQMessage message = MessageFactory.getInstance().createMessage(null, false);
+
+        MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+        int bodySize = 0;
+
+        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+        props.setAppId("HandleTest");
+
+        ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+        ReentrantLock waitLock = new ReentrantLock();
+        Condition wait = waitLock.newCondition();
+        try
+        {
+            message.setExpiration(System.currentTimeMillis() + 10L);
+
+            message.setPublishAndContentHeaderBody(null, mpi, chb);
+
+            QueueEntry queueEntry = new MockQueueEntry(message);
+
+            assertFalse("New messages should not be expired.", queueEntry.expired());
+
+            final long MILLIS = 1000000L;
+            long waitTime = 20 * MILLIS;
+
+            while (waitTime > 0)
+            {
+                try
+                {
+                    waitLock.lock();
+
+                    waitTime = wait.awaitNanos(waitTime);
+                }
+                catch (InterruptedException e)
+                {
+                    //Stop if we are interrupted
+                    fail(e.getMessage());
+                }
+                finally
+                {
+                    waitLock.unlock();
+                }
+
+            }
+
+            assertTrue("After a sleep messages should now be expired.", queueEntry.expired());
+
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+    }
+
+    public void testNoExpiry()
+    {
+        AMQMessage message = MessageFactory.getInstance().createMessage(null, false);
+
+        MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+        int bodySize = 0;
+
+        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+        props.setAppId("HandleTest");
+
+        ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+        ReentrantLock waitLock = new ReentrantLock();
+        Condition wait = waitLock.newCondition();
+        try
+        {
+
+            message.setPublishAndContentHeaderBody(null, mpi, chb);
+            
+            QueueEntry queueEntry = new MockQueueEntry(message);
+
+            assertFalse("New messages should not be expired.", queueEntry.expired());
+
+            final long MILLIS = 1000000L;
+            long waitTime = 10 * MILLIS;
+
+            while (waitTime > 0)
+            {
+                try
+                {
+                    waitLock.lock();
+
+                    waitTime = wait.awaitNanos(waitTime);
+                }
+                catch (InterruptedException e)
+                {
+                    //Stop if we are interrupted
+                    fail(e.getMessage());
+                }
+                finally
+                {
+                    waitLock.unlock();
+                }
+
+            }
+
+            assertFalse("After a sleep messages without an expiry should not expire.", queueEntry.expired());
+
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+    }
 
 }

Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Tue Mar 10 23:10:57 2009
@@ -21,8 +21,6 @@
  */
 
 import junit.framework.TestCase;
-
-import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
@@ -31,17 +29,18 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.subscription.MockSubscription;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.transactionlog.TransactionLog;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -51,7 +50,7 @@
 
     protected SimpleAMQQueue _queue;
     protected VirtualHost _virtualHost;
-    protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore();
+    protected TestableMemoryMessageStore _transactionLog = new TestableMemoryMessageStore();
     protected AMQShortString _qname = new AMQShortString("qname");
     protected AMQShortString _owner = new AMQShortString("owner");
     protected AMQShortString _routingKey = new AMQShortString("routing key");
@@ -60,7 +59,7 @@
     protected FieldTable _arguments = null;
 
     MessagePublishInfo info = new MessagePublishInfoImpl();
-    private static final long MESSAGE_SIZE = 100;
+    protected static long MESSAGE_SIZE = 100;
 
     @Override
     protected void setUp() throws Exception
@@ -70,7 +69,7 @@
         ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1);
 
         PropertiesConfiguration env = new PropertiesConfiguration();
-        _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _store);
+        _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getSimpleName(), env), _transactionLog);
         applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
 
         _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments);
@@ -320,8 +319,8 @@
     public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
     {
         // Create IncomingMessage and nondurable queue
-        NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
-        IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_store), _store);
+        NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+        IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
 
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
         contentHeaderBody.properties = new BasicContentHeaderProperties();
@@ -335,18 +334,18 @@
         // Send persistent message
         qs.add(_queue);
         msg.enqueue(qs);
-        msg.routingComplete(_store);
+        msg.routingComplete(_transactionLog);
 
-        _store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
+        _transactionLog.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
 
         // Check that it is enqueued
-        List<AMQQueue> data = _store.getMessageReferenceMap(messageId);
+        List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId);
         assertNotNull(data);
 
         // Dequeue message
         ContentHeaderBody header = new ContentHeaderBody();
         header.bodySize = MESSAGE_SIZE;
-        AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store);
+        AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _transactionLog);
         message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
 
         MockQueueEntry entry = new MockQueueEntry(message, _queue);
@@ -355,10 +354,164 @@
         entry.dequeue(null);
 
         // Check that it is dequeued
-        data = _store.getMessageReferenceMap(messageId);
+        data = _transactionLog.getMessageReferenceMap(messageId);
         assertNull(data);
     }
 
+    public void testMessagesFlowToDisk() throws AMQException, InterruptedException
+    {
+        // Create IncomingMessage and nondurable queue
+        NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+
+        MESSAGE_SIZE = 1;
+        long MEMORY_MAX = 500;
+        int MESSAGE_COUNT = (int) MEMORY_MAX * 2;
+        //Set the Memory Usage to be very low
+        _queue.setMemoryUsageMaximum(MEMORY_MAX);
+
+        for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++)
+        {
+            sendMessage(txnContext);
+        }
+
+        //Check that we can hold 10 messages without flowing
+        assertEquals(MESSAGE_COUNT / 2, _queue.getMessageCount());
+        assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+        assertTrue("Queue is flowed.", !_queue.isFlowed());
+
+        // Send anothe and ensure we are flowed
+        sendMessage(txnContext);
+        assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount());
+        assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent());
+        assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+        //send another 99 so there are 200msgs in total on the queue
+        for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++)
+        {
+            sendMessage(txnContext);
+
+            long usage = _queue.getMemoryUsageCurrent();
+            assertTrue("Queue has gone over quota:" + usage,
+                       usage <= _queue.getMemoryUsageMaximum());
+
+            assertTrue("Queue has a negative quota:" + usage, usage > 0);
+
+        }
+        assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+        assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+        assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+        _queue.registerSubscription(_subscription, false);
+
+        int slept = 0;
+        while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10)
+        {
+            Thread.sleep(500);
+            slept++;
+        }
+
+        //Ensure the messages are retreived
+        assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
+
+        //Check the queue is still within it's limits.
+        long current = _queue.getMemoryUsageCurrent();
+        assertTrue("Queue has gone over quota:" + current + "/" + _queue.getMemoryUsageMaximum(),
+                   current <= _queue.getMemoryUsageMaximum());
+
+        assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0);
+
+        for (int index = 0; index < MESSAGE_COUNT; index++)
+        {
+            // Ensure that we have received the messages and it wasn't flushed to disk before we received it.
+            AMQMessage message = _subscription.getMessages().get(index);
+            assertNotNull("Message:" + message.debugIdentity() + " was null.", message);
+        }
+    }
+
+    public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException
+    {
+        // Create IncomingMessage and nondurable queue
+        NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+
+        MESSAGE_SIZE = 1;
+        /** Set to larger than the purge batch size. Default 100.
+         * @see FlowableBaseQueueEntryList.BATCH_PROCESS_COUNT */
+        long MEMORY_MAX = 500;
+        int MESSAGE_COUNT = (int) MEMORY_MAX;
+        //Set the Memory Usage to be very low
+        _queue.setMemoryUsageMaximum(MEMORY_MAX);
+
+        for (int msgCount = 0; msgCount < MESSAGE_COUNT; msgCount++)
+        {
+            sendMessage(txnContext);
+        }
+
+        //Check that we can hold all messages without flowing
+        assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+        assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+        assertTrue("Queue is flowed.", !_queue.isFlowed());
+
+        // Send anothe and ensure we are flowed
+        sendMessage(txnContext);
+        assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
+        assertEquals(MESSAGE_COUNT, _queue.getMemoryUsageCurrent());
+        assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+        _queue.setMemoryUsageMaximum(0L);
+
+        //Give the purger time to work maximum of 1s
+        int slept = 0;
+        while (_queue.getMemoryUsageCurrent() > 0 && slept < 5)
+        {
+            Thread.yield();
+            Thread.sleep(200);
+            slept++;
+        }
+
+        assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
+        assertEquals(0L, _queue.getMemoryUsageCurrent());
+        assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+    }
+
+    protected void sendMessage(TransactionalContext txnContext) throws AMQException
+    {
+        sendMessage(txnContext, 5);
+    }
+
+    protected void sendMessage(TransactionalContext txnContext, int priority) throws AMQException
+    {
+        IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
+
+        ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+        contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID;
+        contentHeaderBody.bodySize = MESSAGE_SIZE;
+        contentHeaderBody.properties = new BasicContentHeaderProperties();
+        ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+        ((BasicContentHeaderProperties) contentHeaderBody.properties).setPriority((byte) priority);
+        msg.setContentHeaderBody(contentHeaderBody);
+
+        long messageId = msg.getMessageId();
+
+        ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+
+        // Send persistent 10 messages
+
+        qs.add(_queue);
+        msg.enqueue(qs);
+
+        msg.routingComplete(_transactionLog);
+
+        msg.addContentBodyFrame(new MockContentChunk(1));
+
+        msg.deliverToQueues();
+
+        //Check message was correctly enqueued
+        List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId);
+        assertNotNull(data);
+    }
+
+
     // FIXME: move this to somewhere useful
     private static AMQMessage createMessage(final MessagePublishInfo publishBody)
     {
@@ -384,7 +537,7 @@
 
     public AMQMessage createMessage() throws AMQException
     {
-        AMQMessage message = new TestMessage(info, _store);
+        AMQMessage message = new TestMessage(info, _transactionLog);
 
         ContentHeaderBody header = new ContentHeaderBody();
         header.bodySize = MESSAGE_SIZE;
@@ -410,7 +563,6 @@
             _transactionLog = transactionLog;
         }
 
-
         void assertCountEquals(int expected)
         {
             assertEquals("Wrong count for message with tag " + _tag, expected,

Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java Tue Mar 10 23:10:57 2009
@@ -287,180 +287,5 @@
         assertFalse(_message.isPersistent());
     }
 
-    public void testImmediateAndNotDelivered()
-    {
-        _message = newMessage();
-
-        MessagePublishInfo mpi = new MessagePublishInfoImpl(null, true, false, null);
-        int bodySize = 0;
-
-        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
-        props.setAppId("HandleTest");
-
-        ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
-        try
-        {
-            _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
-
-            assertTrue("Undelivered Immediate message should still be marked as so", _message.immediateAndNotDelivered());
-
-            assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer());
-
-            _message.setDeliveredToConsumer();
-
-            assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer());
-
-            assertFalse("Delivered Immediate message now be marked as so", _message.immediateAndNotDelivered());
-        }
-        catch (AMQException e)
-        {
-            fail(e.getMessage());
-        }
-    }
-
-    public void testNotImmediateAndNotDelivered()
-    {
-        _message = newMessage();
-
-        MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
-        int bodySize = 0;
-
-        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
-        props.setAppId("HandleTest");
-
-        ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
-        try
-        {
-            _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
-
-            assertFalse("Undelivered Non-Immediate message should not result in true.", _message.immediateAndNotDelivered());
-
-            assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer());
-
-            _message.setDeliveredToConsumer();
-
-            assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer());
-
-            assertFalse("Delivered Non-Immediate message not change this return", _message.immediateAndNotDelivered());
-        }
-        catch (AMQException e)
-        {
-            fail(e.getMessage());
-        }
-    }
-
-    public void testExpiry()
-    {
-        _message = newMessage();
-
-        MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
-        int bodySize = 0;
-
-        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
-        props.setAppId("HandleTest");
-
-        ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
-        ReentrantLock waitLock = new ReentrantLock();
-        Condition wait = waitLock.newCondition();
-        try
-        {
-            _message.setExpiration(System.currentTimeMillis() + 10L);
-
-            _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
-
-            assertFalse("New messages should not be expired.", _message.expired());
-
-            final long MILLIS =1000000L;
-            long waitTime = 20 * MILLIS;
-
-            while (waitTime > 0)
-            {
-                try
-                {
-                    waitLock.lock();
-
-                    waitTime = wait.awaitNanos(waitTime);
-                }
-                catch (InterruptedException e)
-                {
-                    //Stop if we are interrupted
-                    fail(e.getMessage());
-                }
-                finally
-                {
-                    waitLock.unlock();
-                }
-
-            }
-
-            assertTrue("After a sleep messages should now be expired.", _message.expired());
-
-        }
-        catch (AMQException e)
-        {
-            fail(e.getMessage());
-        }
-    }
-
-
-        public void testNoExpiry()
-    {
-        _message = newMessage();
-
-        MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
-        int bodySize = 0;
-
-        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
-        props.setAppId("HandleTest");
-
-        ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
-        ReentrantLock waitLock = new ReentrantLock();
-        Condition wait = waitLock.newCondition();
-        try
-        {
-
-            _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
-
-            assertFalse("New messages should not be expired.", _message.expired());
-
-            final long MILLIS =1000000L;
-            long waitTime = 10 * MILLIS;
-
-            while (waitTime > 0)
-            {
-                try
-                {
-                    waitLock.lock();
-
-                    waitTime = wait.awaitNanos(waitTime);
-                }
-                catch (InterruptedException e)
-                {
-                    //Stop if we are interrupted
-                    fail(e.getMessage());
-                }
-                finally
-                {
-                    waitLock.unlock();
-                }
-
-            }
-
-            assertFalse("After a sleep messages without an expiry should not expire.", _message.expired());
-
-        }
-        catch (AMQException e)
-        {
-            fail(e.getMessage());
-        }
-    }
-
+ 
 }

Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java Tue Mar 10 23:10:57 2009
@@ -27,6 +27,7 @@
 import junit.framework.TestCase;
 
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.qpid.server.configuration.SecurityConfiguration;
@@ -79,7 +80,7 @@
         assertTrue(_authzManager.authorisePurge(_session, queue));
     }
 
-    public void testACLManagerConfigurationPluginManagerACLPlugin()
+    public void testACLManagerConfigurationPluginManagerACLPlugin() throws ConfigurationException
     {
         _authzManager = new ACLManager(_conf, _pluginManager, ExchangeDenier.FACTORY);
         
@@ -87,7 +88,7 @@
         assertFalse(_authzManager.authoriseDelete(_session, exchange));
     }
     
-    public void testConfigurePlugins()
+    public void testConfigurePlugins() throws ConfigurationException
     {
         Configuration hostConfig = new PropertiesConfiguration();
         hostConfig.setProperty("queueDenier", "thisoneneither");

Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBeanTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBeanTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBeanTest.java Tue Mar 10 23:10:57 2009
@@ -21,103 +21,213 @@
 
 package org.apache.qpid.server.security.access.management;
 
+import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 
-import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.management.MBeanInvocationHandlerImpl;
+import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase;
 
 import junit.framework.TestCase;
 
+/* Note: The main purpose is to test the jmx access rights file manipulation 
+ * within AMQUserManagementMBean. The Principal Databases are tested by their own tests, 
+ * this test just exercises their usage in AMQUserManagementMBean. 
+ */
 public class AMQUserManagementMBeanTest extends TestCase
 {
-    private Base64MD5PasswordFilePrincipalDatabase _database;
+    private PlainPasswordFilePrincipalDatabase _database;
     private AMQUserManagementMBean _amqumMBean;
+    
+    private File _passwordFile;
+    private File _accessFile;
 
-    private static final String _QPID_HOME =  System.getProperty("QPID_HOME");
-
-    private static final String USERNAME = "testuser";
-    private static final String PASSWORD = "password";
-    private static final String JMXRIGHTS = "admin";
-    private static final String TEMP_PASSWORD_FILE_NAME = "tempPasswordFile.tmp";
-    private static final String TEMP_JMXACCESS_FILE_NAME = "tempJMXAccessFile.tmp";
+    private static final String TEST_USERNAME = "testuser";
+    private static final String TEST_PASSWORD = "password";
 
     @Override
     protected void setUp() throws Exception
     {
-        assertNotNull("QPID_HOME not set", _QPID_HOME);
-
-        _database = new Base64MD5PasswordFilePrincipalDatabase();
+        _database = new PlainPasswordFilePrincipalDatabase();
         _amqumMBean = new AMQUserManagementMBean();
+        loadFreshTestPasswordFile();
+        loadFreshTestAccessFile();
     }
 
     @Override
     protected void tearDown() throws Exception
     {
-        File testFile = new File(_QPID_HOME + File.separator + TEMP_JMXACCESS_FILE_NAME + ".tmp");
-        if (testFile.exists())
+            _passwordFile.delete();
+            _accessFile.delete();
+    }
+
+    public void testDeleteUser()
+    {
+        loadFreshTestPasswordFile();
+        loadFreshTestAccessFile();
+
+        //try deleting a non existant user
+        assertFalse(_amqumMBean.deleteUser("made.up.username"));
+        
+        assertTrue(_amqumMBean.deleteUser(TEST_USERNAME));
+    }
+    
+    public void testDeleteUserIsSavedToAccessFile()
+    {
+        loadFreshTestPasswordFile();
+        loadFreshTestAccessFile();
+
+        assertTrue(_amqumMBean.deleteUser(TEST_USERNAME));
+
+        //check the access rights were actually deleted from the file
+        try{
+            BufferedReader reader = new BufferedReader(new FileReader(_accessFile));
+
+            //check the 'generated by' comment line is present
+            assertTrue("File has no content", reader.ready());
+            assertTrue("'Generated by' comment line was missing",reader.readLine().contains("Generated by " +
+                                                      "AMQUserManagementMBean Console : Last edited by user:"));
+
+            //there should also be a modified date/time comment line
+            assertTrue("File has no modified date/time comment line", reader.ready());
+            assertTrue("Modification date/time comment line was missing",reader.readLine().startsWith("#"));
+            
+            //the access file should not contain any further data now as we just deleted the only user
+            assertFalse("User access data was present when it should have been deleted", reader.ready());
+        }
+        catch (IOException e)
         {
-            testFile.delete();
+            fail("Unable to valdate file contents due to:" + e.getMessage());
         }
+        
+    }
 
-        testFile = new File(_QPID_HOME + File.separator + TEMP_JMXACCESS_FILE_NAME + ".old");
-        if (testFile.exists())
+    public void testSetRights()
+    {
+        loadFreshTestPasswordFile();
+        loadFreshTestAccessFile();
+        
+        assertFalse(_amqumMBean.setRights("made.up.username", true, false, false));
+        
+        assertTrue(_amqumMBean.setRights(TEST_USERNAME, true, false, false));
+        assertTrue(_amqumMBean.setRights(TEST_USERNAME, false, true, false));
+        assertTrue(_amqumMBean.setRights(TEST_USERNAME, false, false, true));
+    }
+    
+    public void testSetRightsIsSavedToAccessFile()
+    {
+        loadFreshTestPasswordFile();
+        loadFreshTestAccessFile();
+        
+        assertTrue(_amqumMBean.setRights(TEST_USERNAME, false, false, true));
+        
+        //check the access rights were actually updated in the file
+        try{
+            BufferedReader reader = new BufferedReader(new FileReader(_accessFile));
+
+            //check the 'generated by' comment line is present
+            assertTrue("File has no content", reader.ready());
+            assertTrue("'Generated by' comment line was missing",reader.readLine().contains("Generated by " +
+                                                      "AMQUserManagementMBean Console : Last edited by user:"));
+
+            //there should also be a modified date/time comment line
+            assertTrue("File has no modified date/time comment line", reader.ready());
+            assertTrue("Modification date/time comment line was missing",reader.readLine().startsWith("#"));
+            
+            //the access file should not contain any further data now as we just deleted the only user
+            assertTrue("User access data was not updated in the access file", 
+                    reader.readLine().equals(TEST_USERNAME + "=" + MBeanInvocationHandlerImpl.ADMIN));
+            
+            //the access file should not contain any further data now as we just deleted the only user
+            assertFalse("Additional user access data was present when there should be no more", reader.ready());
+        }
+        catch (IOException e)
         {
-            testFile.delete();
+            fail("Unable to valdate file contents due to:" + e.getMessage());
         }
+    }
 
-        testFile = new File(_QPID_HOME + File.separator + TEMP_PASSWORD_FILE_NAME + ".tmp");
-        if (testFile.exists())
+    public void testMBeanVersion()
+    {
+        try
         {
-            testFile.delete();
+            ObjectName name = _amqumMBean.getObjectName();
+            assertEquals(AMQUserManagementMBean.VERSION, Integer.parseInt(name.getKeyProperty("version")));
         }
-
-        testFile = new File(_QPID_HOME + File.separator + TEMP_PASSWORD_FILE_NAME + ".old");
-        if (testFile.exists())
+        catch (MalformedObjectNameException e)
         {
-            testFile.delete();
+            fail(e.getMessage());
         }
     }
 
-    public void testDeleteUser()
+    public void testSetAccessFileWithMissingFile()
     {
-        loadTestPasswordFile();
-        loadTestAccessFile();
-
-        boolean deleted = false;
+        try
+        {
+            _amqumMBean.setAccessFile("made.up.filename");
+        }
+        catch (IOException e)
+        {
+            fail("Should not have been an IOE." + e.getMessage());
+        }
+        catch (ConfigurationException e)
+        {
+            assertTrue(e.getMessage(), e.getMessage().endsWith("does not exist"));
+        }
+    }
 
+    public void testSetAccessFileWithReadOnlyFile()
+    {
+        File testFile = null;
         try
         {
-            deleted = _amqumMBean.deleteUser(USERNAME);
+            testFile = File.createTempFile(this.getClass().getName(),".access.readonly");
+            BufferedWriter passwordWriter = new BufferedWriter(new FileWriter(testFile, false));
+            passwordWriter.write(TEST_USERNAME + ":" + TEST_PASSWORD);
+            passwordWriter.newLine();
+            passwordWriter.flush();
+            passwordWriter.close();
+
+            testFile.setReadOnly();
+            _amqumMBean.setAccessFile(testFile.getPath());
         }
-        catch(Exception e){
-            fail("Unable to delete user: " + e.getMessage());
+        catch (IOException e)
+        {
+            fail("Access file was not created." + e.getMessage());
+        }
+        catch (ConfigurationException e)
+        {
+            fail("There should not have been a configuration exception." + e.getMessage());
         }
 
-        assertTrue(deleted);
+        testFile.delete();
     }
- 
-    
+
     // ============================ Utility methods =========================
 
-    private void loadTestPasswordFile()
+    private void loadFreshTestPasswordFile()
     {
         try
         {
-            File tempPasswordFile = new File(_QPID_HOME + File.separator + TEMP_PASSWORD_FILE_NAME);
-            if (tempPasswordFile.exists())
+            if(_passwordFile == null)
             {
-                tempPasswordFile.delete();
+                _passwordFile = File.createTempFile(this.getClass().getName(),".password");
             }
-            tempPasswordFile.deleteOnExit();
 
-            BufferedWriter passwordWriter = new BufferedWriter(new FileWriter(tempPasswordFile));
-            passwordWriter.write(USERNAME + ":" + PASSWORD);
+            BufferedWriter passwordWriter = new BufferedWriter(new FileWriter(_passwordFile, false));
+            passwordWriter.write(TEST_USERNAME + ":" + TEST_PASSWORD);
             passwordWriter.newLine();
             passwordWriter.flush();
-
-            _database.setPasswordFile(tempPasswordFile.toString());
+            passwordWriter.close();
+            _database.setPasswordFile(_passwordFile.toString());
             _amqumMBean.setPrincipalDatabase(_database);
         }
         catch (IOException e)
@@ -126,27 +236,36 @@
         }
     }
 
-    private void loadTestAccessFile()
+    private void loadFreshTestAccessFile()
     {
         try
         {
-            File tempAccessFile = new File(_QPID_HOME + File.separator + TEMP_JMXACCESS_FILE_NAME);
-            if (tempAccessFile.exists())
+            if(_accessFile == null)
             {
-                tempAccessFile.delete();
+                _accessFile = File.createTempFile(this.getClass().getName(),".access");
             }
-            tempAccessFile.deleteOnExit();
-
-            BufferedWriter accessWriter = new BufferedWriter(new FileWriter(tempAccessFile));
-            accessWriter.write(USERNAME + "=" + JMXRIGHTS);
+            
+            BufferedWriter accessWriter = new BufferedWriter(new FileWriter(_accessFile,false));
+            accessWriter.write("#Last Updated By comment");
+            accessWriter.newLine();
+            accessWriter.write("#Date/time comment");
+            accessWriter.newLine();
+            accessWriter.write(TEST_USERNAME + "=" + MBeanInvocationHandlerImpl.READONLY);
             accessWriter.newLine();
             accessWriter.flush();
+            accessWriter.close();
+        }
+        catch (IOException e)
+        {
+            fail("Unable to create test access file: " + e.getMessage());
+        }
 
-            _amqumMBean.setAccessFile(tempAccessFile.toString());
+        try{
+            _amqumMBean.setAccessFile(_accessFile.toString());
         }
         catch (Exception e)
         {
-            fail("Unable to create test access file: " + e.getMessage());
+            fail("Unable to set access file: " + e.getMessage());
         }
     }
 }

Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabaseTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabaseTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabaseTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabaseTest.java Tue Mar 10 23:10:57 2009
@@ -22,8 +22,10 @@
 
 import junit.framework.TestCase;
 
+import javax.security.auth.callback.PasswordCallback;
 import javax.security.auth.login.AccountNotFoundException;
 
+import org.apache.commons.codec.binary.Base64;
 import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
 
 import java.io.BufferedReader;
@@ -33,7 +35,9 @@
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.security.Principal;
+import java.util.Arrays;
 import java.util.List;
 import java.util.regex.Pattern;
 
@@ -41,12 +45,38 @@
 {
 
     private static final String TEST_COMMENT = "# Test Comment";
-    private String USERNAME = "testUser";
-    private String _username = this.getClass().getName()+"username";
-    private char[] _password = "password".toCharArray();
-    private Principal _principal = new UsernamePrincipal(_username);
+
+    private static final String USERNAME = "testUser";
+    private static final String PASSWORD = "guest";
+    private static final String PASSWORD_B64MD5HASHED = "CE4DQ6BIb/BVMN9scFyLtA==";
+    private static char[] PASSWORD_MD5_CHARS;
+    private static final String PRINCIPAL_USERNAME = "testUserPrincipal";
+    private static final Principal PRINCIPAL = new UsernamePrincipal(PRINCIPAL_USERNAME);
     private Base64MD5PasswordFilePrincipalDatabase _database;
     private File _pwdFile;
+    
+    static
+    {
+        try
+        {
+            Base64 b64 = new Base64();
+            byte[] md5passBytes = PASSWORD_B64MD5HASHED.getBytes(Base64MD5PasswordFilePrincipalDatabase.DEFAULT_ENCODING);
+            byte[] decoded = b64.decode(md5passBytes);
+
+            PASSWORD_MD5_CHARS = new char[decoded.length];
+
+            int index = 0;
+            for (byte c : decoded)
+            {
+                PASSWORD_MD5_CHARS[index++] = (char) c;
+            }
+        }
+        catch (UnsupportedEncodingException e)
+        {
+            fail("Unable to perform B64 decode to get the md5 char[] password");
+        }
+    }
+    
 
     public void setUp() throws Exception
     {
@@ -111,7 +141,56 @@
 
         loadPasswordFile(testFile);
 
-        final String CREATED_PASSWORD = "createdPassword";
+
+        Principal principal = new Principal()
+        {
+            public String getName()
+            {
+                return USERNAME;
+            }
+        };
+
+        assertTrue("New user not created.", _database.createPrincipal(principal, PASSWORD.toCharArray()));
+        
+        PasswordCallback callback = new PasswordCallback("prompt",false);
+        try
+        {
+            _database.setPassword(principal, callback);
+        }
+        catch (AccountNotFoundException e)
+        {
+            fail("user account did not exist");
+        }
+        assertTrue("Password returned was incorrect.", Arrays.equals(PASSWORD_MD5_CHARS, callback.getPassword()));
+
+        loadPasswordFile(testFile);
+
+        try
+        {
+            _database.setPassword(principal, callback);
+        }
+        catch (AccountNotFoundException e)
+        {
+            fail("user account did not exist");
+        }
+        assertTrue("Password returned was incorrect.", Arrays.equals(PASSWORD_MD5_CHARS, callback.getPassword()));
+        
+        assertNotNull("Created User was not saved", _database.getUser(USERNAME));
+
+        assertFalse("Duplicate user created.", _database.createPrincipal(principal, PASSWORD.toCharArray()));
+
+        testFile.delete();
+    }
+    
+    public void testCreatePrincipalIsSavedToFile()
+    {
+
+        File testFile = createPasswordFile(1, 0);
+
+        loadPasswordFile(testFile);
+        
+        final String CREATED_PASSWORD = "guest";
+        final String CREATED_B64MD5HASHED_PASSWORD = "CE4DQ6BIb/BVMN9scFyLtA==";
         final String CREATED_USERNAME = "createdUser";
 
         Principal principal = new Principal()
@@ -122,16 +201,37 @@
             }
         };
 
-        assertTrue("New user not created.", _database.createPrincipal(principal, CREATED_PASSWORD.toCharArray()));
+        _database.createPrincipal(principal, CREATED_PASSWORD.toCharArray());
 
-        loadPasswordFile(testFile);
+        try
+        {
+            BufferedReader reader = new BufferedReader(new FileReader(testFile));
+
+            assertTrue("File has no content", reader.ready());
+
+            assertEquals("Comment line has been corrupted.", TEST_COMMENT, reader.readLine());
 
-        assertNotNull("Created User was not saved", _database.getUser(CREATED_USERNAME));
+            assertTrue("File is missing user data.", reader.ready());
 
-        assertFalse("Duplicate user created.", _database.createPrincipal(principal, CREATED_PASSWORD.toCharArray()));
+            String userLine = reader.readLine();
+
+            String[] result = Pattern.compile(":").split(userLine);
 
+            assertEquals("User line not complete '" + userLine + "'", 2, result.length);
+
+            assertEquals("Username not correct,", CREATED_USERNAME, result[0]);
+            assertEquals("Password not correct,", CREATED_B64MD5HASHED_PASSWORD, result[1]);
+
+            assertFalse("File has more content", reader.ready());
+
+        }
+        catch (IOException e)
+        {
+            fail("Unable to valdate file contents due to:" + e.getMessage());
+        }
         testFile.delete();
     }
+    
 
     public void testDeletePrincipal()
     {
@@ -228,8 +328,8 @@
 
         assertNotNull(testUser);
 
-        String NEW_PASSWORD = "NewPassword";
-        String NEW_PASSWORD_HASH = "TmV3UGFzc3dvcmQ=";
+        String NEW_PASSWORD = "guest";
+        String NEW_PASSWORD_HASH = "CE4DQ6BIb/BVMN9scFyLtA==";
         try
         {
             _database.updatePassword(testUser, NEW_PASSWORD.toCharArray());
@@ -268,7 +368,7 @@
         testFile.delete();
     }
 
-    public void testSetPasswordWithMissingFile()
+    public void testSetPasswordFileWithMissingFile()
     {
         try
         {
@@ -285,7 +385,7 @@
 
     }
 
-    public void testSetPasswordWithReadOnlyFile()
+    public void testSetPasswordFileWithReadOnlyFile()
     {
 
         File testFile = createPasswordFile(0, 0);
@@ -310,28 +410,38 @@
     
     public void testCreateUserPrincipal() throws IOException
     {
-        _database.createPrincipal(_principal, _password);
-        Principal newPrincipal = _database.getUser(_username);
+        _database.createPrincipal(PRINCIPAL, PASSWORD.toCharArray());
+        Principal newPrincipal = _database.getUser(PRINCIPAL_USERNAME);
         assertNotNull(newPrincipal);
-        assertEquals(_principal.getName(), newPrincipal.getName());
+        assertEquals(PRINCIPAL.getName(), newPrincipal.getName());
     }
     
     public void testVerifyPassword() throws IOException, AccountNotFoundException
     {
         testCreateUserPrincipal();
         //assertFalse(_pwdDB.verifyPassword(_username, null));
-        assertFalse(_database.verifyPassword(_username, new char[]{}));
-        assertFalse(_database.verifyPassword(_username, "massword".toCharArray()));
-        assertTrue(_database.verifyPassword(_username, _password));
+        assertFalse(_database.verifyPassword(PRINCIPAL_USERNAME, new char[]{}));
+        assertFalse(_database.verifyPassword(PRINCIPAL_USERNAME, (PASSWORD+"z").toCharArray()));
+        assertTrue(_database.verifyPassword(PRINCIPAL_USERNAME, PASSWORD.toCharArray()));
+        
+        try
+        {
+            _database.verifyPassword("made.up.username", PASSWORD.toCharArray());
+            fail("Should not have been able to verify this non-existant users password.");
+        }
+        catch (AccountNotFoundException e)
+        {
+            // pass
+        }
     }
     
     public void testUpdatePassword() throws IOException, AccountNotFoundException 
     {
         testCreateUserPrincipal();
         char[] newPwd = "newpassword".toCharArray();
-        _database.updatePassword(_principal, newPwd);
-        assertFalse(_database.verifyPassword(_username, _password));
-        assertTrue(_database.verifyPassword(_username, newPwd));
+        _database.updatePassword(PRINCIPAL, newPwd);
+        assertFalse(_database.verifyPassword(PRINCIPAL_USERNAME, PASSWORD.toCharArray()));
+        assertTrue(_database.verifyPassword(PRINCIPAL_USERNAME, newPwd));
     }
-    
+
 }

Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/HashedUserTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/HashedUserTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/HashedUserTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/HashedUserTest.java Tue Mar 10 23:10:57 2009
@@ -34,7 +34,7 @@
 
     String USERNAME = "username";
     String PASSWORD = "password";
-    String HASHED_PASSWORD = "cGFzc3dvcmQ=";
+    String B64_ENCODED_PASSWORD = "cGFzc3dvcmQ=";
 
     public void testToLongArrayConstructor()
     {
@@ -57,11 +57,11 @@
     {
         try
         {
-            HashedUser user = new HashedUser(new String[]{USERNAME, HASHED_PASSWORD});
+            HashedUser user = new HashedUser(new String[]{USERNAME, B64_ENCODED_PASSWORD});
             assertEquals("Username incorrect", USERNAME, user.getName());
             int index = 0;
 
-            char[] hash = HASHED_PASSWORD.toCharArray();
+            char[] hash = B64_ENCODED_PASSWORD.toCharArray();
 
             try
             {

Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Tue Mar 10 23:10:57 2009
@@ -30,10 +30,13 @@
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
+import org.apache.log4j.Logger;
 
 public class MockSubscription implements Subscription
 {
+    private static final Logger _logger = Logger.getLogger(MockSubscription.class);
 
     private boolean _closed = false;
     private AMQShortString tag = new AMQShortString("mocktag");
@@ -41,8 +44,12 @@
     private StateListener _listener = null;
     private QueueEntry lastSeen = null;
     private State _state = State.ACTIVE;
-    private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
+    private ArrayList<QueueEntry> _queueEntries = new ArrayList<QueueEntry>();
     private final Lock _stateChangeLock = new ReentrantLock();
+    private ArrayList<AMQMessage> _messages = new ArrayList<AMQMessage>();
+
+
+
 
     public void close()
     {
@@ -136,10 +143,14 @@
     {
     }
 
-    public void send(QueueEntry msg) throws AMQException
+    public void send(QueueEntry entry) throws AMQException
     {
-        lastSeen = msg;
-        messages.add(msg);
+        _logger.info("Sending Message(" + entry.debugIdentity() + ")  to subscription:" + this);
+
+        lastSeen = entry;
+        _queueEntries.add(entry);
+        _messages.add(entry.getMessage());
+        entry.setDeliveredToSubscription();        
     }
 
     public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
@@ -173,8 +184,14 @@
         return false;
     }
 
-    public ArrayList<QueueEntry> getMessages()
+    public ArrayList<QueueEntry> getQueueEntries()
     {
-        return messages;
+        return _queueEntries;
     }
+
+    public ArrayList<AMQMessage> getMessages()
+    {
+        return _messages;
+    }
+
 }

Modified: qpid/branches/qpid-1673/qpid/java/build.deps
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/build.deps?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/build.deps (original)
+++ qpid/branches/qpid-1673/qpid/java/build.deps Tue Mar 10 23:10:57 2009
@@ -92,7 +92,6 @@
 client-example.libs=${client.libs}
 testkit.libs=${client.libs}
 
-
 ibm-icu=lib/com.ibm.icu_3.8.1.v20080530.jar
 ecl-core-jface=lib/org.eclipse.jface_3.4.1.M20080827-2000.jar
 ecl-core-jface-databinding=lib/org.eclipse.jface.databinding_1.2.1.M20080827-0800a.jar
@@ -114,30 +113,41 @@
 ecl-ui=lib/org.eclipse.ui_3.4.1.M20080910-0800.jar
 ecl-ui-forms=lib/org.eclipse.ui.forms_3.3.101.v20080708_34x.jar
 ecl-ui-workbench=lib/org.eclipse.ui.workbench_3.4.1.M20080827-0800a.jar
+apache-commons-codec=lib/org.apache.commons.codec_1.3.0.v20080530-1600.jar
 
 ecl-swt-win32-win32-x86=lib/org.eclipse.swt.win32.win32.x86_3.4.1.v3449c.jar
 ecl-equinox-launcher-win32-win32-x86=lib/org.eclipse.equinox.launcher.win32.win32.x86_1.0.101.R34x_v20080731/**
 ecl-swt-linux-gtk-x86=lib/org.eclipse.swt.gtk.linux.x86_3.4.1.v3449c.jar
 ecl-equinox-launcher-linux-gtk-x86=lib/org.eclipse.equinox.launcher.gtk.linux.x86_1.0.101.R34x_v20080805/**
+ecl-swt-linux-gtk-x86_64=lib/org.eclipse.swt.gtk.linux.x86_64_3.4.1.v3449c.jar
+ecl-equinox-launcher-linux-gtk-x86_64=lib/org.eclipse.equinox.launcher.gtk.linux.x86_64_1.0.101.R34x_v20080731/**
 ecl-swt-macosx-carbon=lib/org.eclipse.swt.carbon.macosx_3.4.1.v3449c.jar
 ecl-equinox-launcher-macosx-carbon=lib/org.eclipse.equinox.launcher.carbon.macosx_1.0.101.R34x_v20080731/**
+ecl-swt-solaris-gtk-sparc=lib/org.eclipse.swt.gtk.solaris.sparc_3.4.1.v3449c.jar
+ecl-equinox-launcher-solaris-gtk-sparc=lib/org.eclipse.equinox.launcher.gtk.solaris.sparc_1.0.101.R34x_v20080731/**
 
 management-eclipse-plugin-win32-win32-x86.libs=${management-eclipse-plugin.core-libs} \
     ${ecl-swt-win32-win32-x86} ${ecl-equinox-launcher-win32-win32-x86}
 management-eclipse-plugin-linux-gtk-x86.libs=${management-eclipse-plugin.core-libs} \
     ${ecl-swt-linux-gtk-x86} ${ecl-equinox-launcher-linux-gtk-x86}
+management-eclipse-plugin-linux-gtk-x86_64.libs=${management-eclipse-plugin.core-libs} \
+    ${ecl-swt-linux-gtk-x86_64} ${ecl-equinox-launcher-linux-gtk-x86_64}
 management-eclipse-plugin-macosx.libs=${management-eclipse-plugin.core-libs} \
     ${ecl-swt-macosx-carbon} ${ecl-equinox-launcher-macosx-carbon}
+management-eclipse-plugin-solaris-gtk-sparc.libs=${management-eclipse-plugin.core-libs} \
+    ${ecl-swt-solaris-gtk-sparc} ${ecl-equinox-launcher-solaris-gtk-sparc}
 
 management-eclipse-plugin.core-libs=${ibm-icu} ${ecl-core-jface} ${ecl-core-jface-databinding} \
     ${ecl-core-commands} ${ecl-core-contenttype} ${ecl-core-databinding} ${ecl-core-expressions} \
     ${ecl-core-jobs} ${ecl-core-runtime} ${ecl-core-runtime-compat-registry} ${ecl-equinox-app} \
     ${ecl-equinox-common} ${ecl-equinox-launcher} ${ecl-equinox-prefs} ${ecl-equinox-registry} \
-    ${ecl-help} ${ecl-osgi} ${ecl-swt} ${ecl-ui} ${ecl-ui-forms} ${ecl-ui-workbench}
+    ${ecl-help} ${ecl-osgi} ${ecl-swt} ${ecl-ui} ${ecl-ui-forms} ${ecl-ui-workbench} ${apache-commons-codec}
     
 management-eclipse-plugin.platform-libs=${ecl-equinox-launcher-win32-win32-x86} \
     ${ecl-equinox-launcher-linux-gtk-x86} ${ecl-equinox-launcher-macosx-carbon} \
-    ${ecl-swt-win32-win32-x86} ${ecl-swt-linux-gtk-x86} ${ecl-swt-macosx-carbon}
+    ${ecl-swt-win32-win32-x86} ${ecl-swt-linux-gtk-x86} ${ecl-swt-macosx-carbon} \
+    ${ecl-swt-linux-gtk-x86_64} ${ecl-equinox-launcher-linux-gtk-x86_64} \
+    ${ecl-swt-solaris-gtk-sparc} ${ecl-equinox-launcher-solaris-gtk-sparc}
 
 management-eclipse-plugin.libs=${management-eclipse-plugin.core-libs} ${management-eclipse-plugin.platform-libs}
     

Modified: qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java Tue Mar 10 23:10:57 2009
@@ -42,6 +42,8 @@
 
     public void opened(Session ssn) {}
 
+    public void resumed(Session ssn) {}
+
     public void message(Session ssn, MessageTransfer xfr)
     {
         System.out.println("Message: " + xfr);

Modified: qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java Tue Mar 10 23:10:57 2009
@@ -42,6 +42,8 @@
 
     public void opened(Session ssn) {}
 
+    public void resumed(Session ssn) {}
+
     public void message(Session ssn, MessageTransfer xfr)
     {
         System.out.println("Message: " + xfr);

Modified: qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java Tue Mar 10 23:10:57 2009
@@ -32,6 +32,8 @@
 
     public void opened(Session ssn) {}
 
+    public void resumed(Session ssn) {}
+
     public void message(Session ssn, MessageTransfer xfr)
     {
         String body = xfr.getBodyString();

Modified: qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java Tue Mar 10 23:10:57 2009
@@ -44,6 +44,8 @@
 
     public void opened(Session ssn) {}
 
+    public void resumed(Session ssn) {}
+
     public void message(Session ssn, MessageTransfer xfr)
     {
         String body = xfr.getBodyString();

Modified: qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java Tue Mar 10 23:10:57 2009
@@ -40,6 +40,8 @@
 
     public void opened(Session ssn) {}
 
+    public void resumed(Session ssn) {}
+
     public void message(Session ssn, MessageTransfer xfr)
     {
         DeliveryProperties dp = xfr.getHeader().get(DeliveryProperties.class);

Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Mar 10 23:10:57 2009
@@ -268,6 +268,13 @@
     //Indicates whether persistent messages are synchronized
     private boolean _syncPersistence;
 
+    //Indicates whether we need to sync on every message ack
+    private boolean _syncAck;
+    
+    //Indicates the sync publish options (persistent|all)
+    //By default it's async publish
+    private String _syncPublish = ""; 
+    
     /**
      * @param broker      brokerdetails
      * @param username    username
@@ -348,25 +355,53 @@
     public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
     {
         // set this connection maxPrefetch
-        if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null)
+        if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
         {
-            _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
+            _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH));
         }
         else
         {
             // use the defaul value set for all connections
             _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
-                                                                               ClientProperties.MAX_PREFETCH_DEFAULT));
+                    ClientProperties.MAX_PREFETCH_DEFAULT));
         }
 
-        if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null)
+        if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null)
         {
-            _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE));
+            _syncPersistence = 
+                Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE));
+            _logger.warn("sync_persistence is a deprecated property, " +
+            		"please use sync_publish={persistent|all} instead");
         }
         else
         {
             // use the defaul value set for all connections
             _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
+            if (_syncPersistence)
+            {
+                _logger.warn("sync_persistence is a deprecated property, " +
+                        "please use sync_publish={persistent|all} instead");
+            }
+        }
+
+        if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null)
+        {
+            _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK));
+        }
+        else
+        {
+            // use the defaul value set for all connections
+            _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME);
+        }
+
+        if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null)
+        {
+            _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH);
+        }
+        else
+        {
+            // use the defaul value set for all connections
+            _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish);
         }
         
         _failoverPolicy = new FailoverPolicy(connectionURL, this);
@@ -1469,6 +1504,19 @@
         return _syncPersistence;
     }
     
+    /**
+     * Indicates whether we need to sync on every message ack
+     */
+    public boolean getSyncAck()
+    {
+        return _syncAck;
+    }
+    
+    public String getSyncPublish()
+    {
+        return _syncPublish;
+    }
+        
     public void setIdleTimeout(long l)
     {
         _delegate.setIdleTimeout(l);

Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Tue Mar 10 23:10:57 2009
@@ -239,12 +239,6 @@
                 {
                     _conn.failoverPrep();
                     _qpidConnection.resume();
-
-                    if (_conn.firePreResubscribe())
-                    {
-                        _conn.resubscribeSessions();
-                    }
-
                     _conn.fireFailoverComplete();
                     return;
                 }

Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java Tue Mar 10 23:10:57 2009
@@ -22,14 +22,12 @@
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.qpid.AMQException;
 
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Queue;
 import javax.jms.QueueBrowser;
-
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -90,38 +88,11 @@
     {
         checkState();
         final BasicMessageConsumer consumer =
-            (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+                (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
 
         _consumers.add(consumer);
 
-        return new Enumeration()
-        {
-
-            Message _nextMessage = consumer == null ? null : consumer.receive(1000);
-
-            public boolean hasMoreElements()
-            {
-                _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
-                return (_nextMessage != null);
-            }
-
-            public Object nextElement()
-            {
-                Message msg = _nextMessage;
-                try
-                {
-                    _logger.info("QB:nextElement about to receive");
-                    _nextMessage = consumer.receive(1000);
-                    _logger.info("QB:nextElement received:" + _nextMessage);
-                }
-                catch (JMSException e)
-                {
-                    _logger.warn("Exception caught while queue browsing", e);
-                    _nextMessage = null;
-                }
-                return msg;
-            }
-        };
+        return new QueueBrowserEnumeration(consumer);
     }
 
     public void close() throws JMSException
@@ -134,4 +105,39 @@
         _consumers.clear();
     }
 
+    private class QueueBrowserEnumeration implements Enumeration
+    {
+        Message _nextMessage;
+        private BasicMessageConsumer _consumer;
+
+        public QueueBrowserEnumeration(BasicMessageConsumer consumer) throws JMSException
+        {
+            _nextMessage = consumer == null ? null : consumer.receiveBrowse();
+            _logger.info("QB:created with first element:" + _nextMessage);
+            _consumer = consumer;
+        }
+
+        public boolean hasMoreElements()
+        {
+            _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
+            return (_nextMessage != null);
+        }
+
+        public Object nextElement()
+        {
+            Message msg = _nextMessage;
+            try
+            {
+                _logger.info("QB:nextElement about to receive");
+                _nextMessage = _consumer.receiveBrowse();
+                _logger.info("QB:nextElement received:" + _nextMessage);
+            }
+            catch (JMSException e)
+            {
+                _logger.warn("Exception caught while queue browsing", e);
+                _nextMessage = null;
+            }
+            return msg;
+        }
+    }    
 }

Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Mar 10 23:10:57 2009
@@ -575,12 +575,19 @@
     public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
                           final AMQShortString exchangeName, final AMQDestination destination) throws AMQException
     {
+        bindQueue(queueName, routingKey, arguments, exchangeName, destination, false);
+    }
+
+    public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
+                          final AMQShortString exchangeName, final AMQDestination destination,
+                          final boolean nowait) throws AMQException
+    {
         /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
         new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
         {
             public Object execute() throws AMQException, FailoverException
             {
-                sendQueueBind(queueName, routingKey, arguments, exchangeName, destination);
+                sendQueueBind(queueName, routingKey, arguments, exchangeName, destination, nowait);
                 return null;
             }
         }, _connection).execute();
@@ -595,7 +602,8 @@
     }
 
     public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
-                                       final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException;
+                                       final AMQShortString exchangeName, AMQDestination destination,
+                                       final boolean nowait) throws AMQException, FailoverException;
 
     /**
      * Closes the session.
@@ -1815,6 +1823,11 @@
     void failoverPrep()
     {
         startDispatcherIfNecessary();
+        syncDispatchQueue();
+    }
+
+    void syncDispatchQueue()
+    {
         final CountDownLatch signal = new CountDownLatch(1);
         _queue.add(new Dispatchable() {
             public void dispatch(AMQSession ssn)
@@ -1828,7 +1841,7 @@
         }
         catch (InterruptedException e)
         {
-            // pass
+            throw new RuntimeException(e);
         }
     }
 
@@ -1859,6 +1872,11 @@
         _inRecovery = inRecovery;
     }
 
+    boolean isStarted()
+    {
+        return _startedAtLeastOnce.get();
+    }
+
     /**
      * Starts the session, which ensures that it is not suspended and that its event dispatcher is running.
      *
@@ -2281,7 +2299,13 @@
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
-                                          final boolean noLocal)
+                                          final boolean noLocal) throws AMQException
+    {
+        return declareQueue(amqd, protocolHandler, noLocal, false);
+    }
+
+    protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+                                          final boolean noLocal, final boolean nowait)
             throws AMQException
     {
         /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
@@ -2296,14 +2320,15 @@
                             amqd.setQueueName(protocolHandler.generateQueueName());
                         }
 
-                        sendQueueDeclare(amqd, protocolHandler);
+                        sendQueueDeclare(amqd, protocolHandler, nowait);
 
                         return amqd.getAMQQueueName();
                     }
                 }, _connection).execute();
     }
 
-    public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException;
+    public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+                                          final boolean nowait) throws AMQException, FailoverException;
 
     /**
      * Undeclares the specified queue.
@@ -2416,14 +2441,14 @@
 
         AMQProtocolHandler protocolHandler = getProtocolHandler();
 
-        declareExchange(amqd, protocolHandler, false);
+        declareExchange(amqd, protocolHandler, nowait);
 
-        AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal());
+        AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
 
         // store the consumer queue name
         consumer.setQueuename(queueName);
 
-        bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd);
+        bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
 
         // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
         if (!_immediatePrefetch)
@@ -2455,11 +2480,7 @@
 
         try
         {
-            consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
-        }
-        catch (JMSException e) // thrown by getMessageSelector
-        {
-            throw new AMQException(null, e.getMessage(), e);
+            consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer._messageSelector);
         }
         catch (FailoverException e)
         {
@@ -2531,8 +2552,9 @@
 
         for (C consumer : consumers)
         {
-            consumer.failedOver();
+            consumer.failedOverPre();
             registerConsumer(consumer, true);
+            consumer.failedOverPost();
         }
     }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org