You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/09/26 12:42:57 UTC

svn commit: r579574 - in /incubator/qpid/branches/M2.1/java: broker/src/main/java/org/apache/qpid/server/queue/ broker/src/test/java/org/apache/qpid/server/queue/ broker/src/test/java/org/apache/qpid/server/store/ systests/src/main/java/org/apache/qpid...

Author: ritchiem
Date: Wed Sep 26 03:42:54 2007
New Revision: 579574

URL: http://svn.apache.org/viewvc?rev=579574&view=rev
Log:
QPID-610 : Fix for ManagementConcole NO_ACK & Msg Expire leaks. Updated AMQQueueMBeanTest to verify the contents of the MessageStore after clearing. 
All the MC features only dequeued the message and didn't correctly decrementReference.
The MessageReturnTest was failing due to the TimeToLive test causing messages to be left on the store. The expired messages were not having the reference decremented.

Added:
    incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java   (contents, props changed)
      - copied, changed from r579147, incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
Modified:
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
    incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=579574&r1=579573&r2=579574&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Wed Sep 26 03:42:54 2007
@@ -581,7 +581,7 @@
     /** Removes the AMQMessage from the top of the queue. */
     public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
     {
-        _deliveryMgr.removeAMessageFromTop(storeContext);
+        _deliveryMgr.removeAMessageFromTop(storeContext, this);
     }
 
     /** removes all the messages from the queue. */

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=579574&r1=579573&r2=579574&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Wed Sep 26 03:42:54 2007
@@ -333,7 +333,7 @@
 
                 if (!acks)
                 {
-                   msg.decrementReference(channel.getStoreContext());
+                    msg.decrementReference(channel.getStoreContext());
                 }
             }
             finally
@@ -407,11 +407,16 @@
      *
      * @throws AMQException
      */
-    public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
+    public void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue) throws AMQException
     {
         _lock.lock();
 
         AMQMessage message = _messages.poll();
+
+        message.dequeue(storeContext, queue);
+
+        message.decrementReference(storeContext);
+
         if (message != null)
         {
             _totalMessageSize.addAndGet(-message.getSize());
@@ -434,6 +439,9 @@
                 _messages.poll();
 
                 _queue.dequeue(storeContext, msg);
+
+                msg.decrementReference(_reapingStoreContext);
+
                 msg = getNextMessage();
                 count++;
             }
@@ -478,6 +486,8 @@
 
                 // Use the reapingStoreContext as any sub(if we have one) may be in a tx.
                 message.dequeue(_reapingStoreContext, _queue);
+
+                message.decrementReference(_reapingStoreContext);
 
                 if (_log.isInfoEnabled())
                 {

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?rev=579574&r1=579573&r2=579574&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Wed Sep 26 03:42:54 2007
@@ -72,7 +72,7 @@
      */
     void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws FailedDequeueException, AMQException;
 
-    void removeAMessageFromTop(StoreContext storeContext) throws AMQException;
+    void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue) throws AMQException;
 
     long clearAllMessages(StoreContext storeContext) throws AMQException;
 

Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=579574&r1=579573&r2=579574&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Wed Sep 26 03:42:54 2007
@@ -24,10 +24,13 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -36,6 +39,8 @@
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.mina.common.ByteBuffer;
 
 import javax.management.JMException;
 import java.util.LinkedList;
@@ -49,18 +54,16 @@
     private static long MESSAGE_SIZE = 1000;
     private AMQQueue _queue;
     private AMQQueueMBean _queueMBean;
-    private MessageStore _messageStore = new MemoryMessageStore();
+    private MessageStore _messageStore;
     private StoreContext _storeContext = new StoreContext();
-    private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
-                                                                                     null,
-                                                                                     new LinkedList<RequiredDeliveryException>(),
-                                                                                     new HashSet<Long>());
+    private TransactionalContext _transactionalContext;
     private VirtualHost _virtualHost;
+    private AMQProtocolSession _protocolSession;
 
-    public void testMessageCount() throws Exception
+    public void testMessageCountTransient() throws Exception
     {
         int messageCount = 10;
-        sendMessages(messageCount);
+        sendMessages(messageCount, false);
         assertTrue(_queueMBean.getMessageCount() == messageCount);
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
         long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
@@ -73,6 +76,43 @@
         _queueMBean.clearQueue();
         assertTrue(_queueMBean.getMessageCount() == 0);
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+        //Ensure that the data has been removed from the Store
+        verifyBrokerState();
+    }
+
+    public void testMessageCountPersistent() throws Exception
+    {
+        int messageCount = 10;
+        sendMessages(messageCount, true);
+        assertEquals("", messageCount, _queueMBean.getMessageCount().intValue());
+        assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+        long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+        assertTrue(_queueMBean.getQueueDepth() == queueDepth);
+
+        _queueMBean.deleteMessageFromTop();
+        assertTrue(_queueMBean.getMessageCount() == (messageCount - 1));
+        assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+        _queueMBean.clearQueue();
+        assertTrue(_queueMBean.getMessageCount() == 0);
+        assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+        //Ensure that the data has been removed from the Store
+        verifyBrokerState();
+    }
+
+    // todo: collect to a general testing class -duplicated from Systest/MessageReturntest
+    private void verifyBrokerState()
+    {
+
+        TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
+
+        // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
+        assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());       
+        assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
+        assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
+        assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
     }
 
     public void testConsumerCount() throws AMQException
@@ -86,26 +126,26 @@
         AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null);
         protocolSession.addChannel(channel);
 
-        _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null,false,false);
+        _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false);
         assertTrue(_queueMBean.getActiveConsumerCount() == 1);
 
         SubscriptionSet _subscribers = (SubscriptionSet) mgr;
         SubscriptionFactory subscriptionFactory = new SubscriptionImpl.Factory();
-        Subscription s1 =  subscriptionFactory.createSubscription(channel.getChannelId(),
-                                                                  protocolSession,
-                                                                  new AMQShortString("S1"),
-                                                                  false,
-                                                                  null,
-                                                                  true,
-                                                                  _queue);
-
-        Subscription s2 =  subscriptionFactory.createSubscription(channel.getChannelId(),
-                                                                  protocolSession,
-                                                                  new AMQShortString("S2"),
-                                                                  false,
-                                                                  null,
-                                                                  true,
-                                                                  _queue);
+        Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(),
+                                                                 protocolSession,
+                                                                 new AMQShortString("S1"),
+                                                                 false,
+                                                                 null,
+                                                                 true,
+                                                                 _queue);
+
+        Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(),
+                                                                 protocolSession,
+                                                                 new AMQShortString("S2"),
+                                                                 false,
+                                                                 null,
+                                                                 true,
+                                                                 _queue);
         _subscribers.addSubscriber(s1);
         _subscribers.addSubscriber(s2);
         assertTrue(_queueMBean.getActiveConsumerCount() == 3);
@@ -165,7 +205,7 @@
 
         }
 
-        AMQMessage msg = message(false);
+        AMQMessage msg = message(false, false);
         long id = msg.getMessageId();
         _queue.clearQueue(_storeContext);
 
@@ -184,7 +224,7 @@
         }
     }
 
-    private AMQMessage message(final boolean immediate) throws AMQException
+    private AMQMessage message(final boolean immediate, boolean persistent) throws AMQException
     {
         MessagePublishInfo publish = new MessagePublishInfo()
         {
@@ -209,9 +249,11 @@
                 return null;
             }
         };
-                              
+
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
         contentHeaderBody.bodySize = MESSAGE_SIZE;   // in bytes
+        contentHeaderBody.properties = new BasicContentHeaderProperties();
+        ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
         return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
     }
 
@@ -221,22 +263,38 @@
         super.setUp();
         IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
         _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+        _messageStore = _virtualHost.getMessageStore();
+
+        _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
+                                                            null,
+                                                            new LinkedList<RequiredDeliveryException>(),
+                                                            new HashSet<Long>());
+
         _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
         _queueMBean = new AMQQueueMBean(_queue);
+
+        _protocolSession = new TestMinaProtocolSession();
     }
 
-    private void sendMessages(int messageCount) throws AMQException
+    private void sendMessages(int messageCount, boolean persistent) throws AMQException
     {
-        AMQMessage[] messages = new AMQMessage[messageCount];
-        for (int i = 0; i < messages.length; i++)
-        {
-            messages[i] = message(false);
-            messages[i].enqueue(_queue);
-            messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
-        }
         for (int i = 0; i < messageCount; i++)
         {
-            _queue.process(_storeContext, messages[i], false);
+            AMQMessage currentMessage = message(false, persistent);
+            currentMessage.enqueue(_queue);
+
+            // route header
+            currentMessage.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
+
+            // Add the body so we have somthing to test later
+            currentMessage.addContentBodyFrame(_storeContext,
+                                               _protocolSession.getRegistry()
+                                                       .getProtocolVersionMethodConverter()
+                                                       .convertToContentChunk(
+                                                       new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
+                                                                       MESSAGE_SIZE)));
+
+
         }
     }
 }

Copied: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (from r579147, incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?p2=incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java&p1=incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java&r1=579147&r2=579574&rev=579574&view=diff
==============================================================================
    (empty)

Propchange: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java?rev=579574&r1=579573&r2=579574&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java Wed Sep 26 03:42:54 2007
@@ -78,6 +78,8 @@
 
     private CountDownLatch _returns = new CountDownLatch(1);
     private int _receivedCount = 0;
+    private int _initialContentBodyMapSize;
+    private int _initilaMessageMetaDataMapSize;
 
     protected void setUp() throws Exception
     {
@@ -94,13 +96,19 @@
         env.put("queue.badQueue", QUEUE);
 
         _context = factory.getInitialContext(env);
+
+        getBrokerInitialState();
     }
 
     protected void tearDown() throws Exception
     {
-        _producerConnection.close();
         super.tearDown();
 
+        if (_producerConnection != null)
+        {
+            _producerConnection.close();
+        }
+        
         if (BROKER.startsWith("vm://"))
         {
             TransportConnection.killAllVMBrokers();
@@ -130,7 +138,7 @@
         _producerConnection.close();
 
         //Verify we get all the messages.
-        verifyAllMessagesRecevied();        
+        verifyAllMessagesRecevied();
         //Verify Broker state
         verifyBrokerState();
     }
@@ -153,6 +161,34 @@
         _producer = _producerSession.createProducer((Queue) _context.lookup("badQueue"));
     }
 
+    // todo: collect to a general testing class - duplicated in AMQQueueMBeanTest
+    private void getBrokerInitialState()
+    {
+        IApplicationRegistry registry = ApplicationRegistry.getInstance();
+
+        VirtualHost testVhost = registry.getVirtualHostRegistry().getVirtualHost(VHOST);
+
+        assertNotNull("Unable to get test Vhost", testVhost.getMessageStore());
+
+        TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) testVhost.getMessageStore());
+
+        _initialContentBodyMapSize = store.getContentBodyMap() == null ? 0 : store.getContentBodyMap().size();
+        _initilaMessageMetaDataMapSize = store.getMessageMetaDataMap() == null ? 0 : store.getMessageMetaDataMap().size();
+
+        if (_initialContentBodyMapSize != 0)
+        {
+            _logger.warn("Store is dirty: ContentBodyMap has Size:" + _initialContentBodyMapSize);
+            System.out.println("Store is dirty: ContentBodyMap has Size:" + _initialContentBodyMapSize);
+        }
+
+        if (_initilaMessageMetaDataMapSize != 0)
+        {
+            _logger.warn("Store is dirty: MessageMetaDataMap has Size:" + _initilaMessageMetaDataMapSize);
+            System.out.println("Store is dirty: MessageMetaDataMap has Size:" + _initilaMessageMetaDataMapSize);
+        }
+
+    }
+
     private void verifyBrokerState()
     {
         IApplicationRegistry registry = ApplicationRegistry.getInstance();
@@ -169,7 +205,7 @@
         // If the CBM has content it may be due to the broker not yet purging.
         // Closing the producer connection before testing should give the store time to clean up.
         // Perform a quick sleep just in case
-        if (store.getContentBodyMap().size() != 0)
+        while (store.getContentBodyMap().size() > _initialContentBodyMapSize)
         {
             try
             {
@@ -179,21 +215,9 @@
             {
             }
         }
-        assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
+        assertTrue("Expected the store content size not reached at test start it was :" + _initialContentBodyMapSize + " Now it is :" + store.getContentBodyMap().size(), _initialContentBodyMapSize >= store.getContentBodyMap().size());
         assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
-
-        if (store.getMessageMetaDataMap().size() != 0)
-        {
-            try
-            {
-                Thread.sleep(500);
-            }
-            catch (InterruptedException e)
-            {
-            }
-        }
-
-        assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
+        assertTrue("Expected the store MessageMetaData size not reached at test start it was :" + _initilaMessageMetaDataMapSize + " Now it is :" + store.getMessageMetaDataMap().size(), _initialContentBodyMapSize >= store.getMessageMetaDataMap().size());
     }
 
     private void verifyAllMessagesRecevied()
@@ -221,7 +245,7 @@
 
     /**
      * We can't verify messageOrder here as the return threads are not synchronized so we have no way of
-     * guarranting the order. 
+     * guarranting the order.
      */
     private void verifyMessageOrder()
     {