You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/04/13 13:56:50 UTC

svn commit: r764421 - in /qpid/branches/0.5-fix/qpid: ./ java/broker/src/main/java/org/apache/qpid/server/queue/ java/broker/src/main/java/org/apache/qpid/server/util/ java/broker/src/main/java/org/apache/qpid/server/virtualhost/ java/broker/src/test/j...

Author: ritchiem
Date: Mon Apr 13 11:56:49 2009
New Revision: 764421

URL: http://svn.apache.org/viewvc?rev=764421&view=rev
Log:
QPID-430: Fix message age alerting so that it works on queues which are otherwise inactive.

AMQQueue, VirtualHost, MockAMQQueue: change name of removeExpiredIfNoSubscribers to checkMessageStatus.
AMQQueueMBean: remove unthrown exception
SimpleAMQQueue: add notification checks to checkMessageStatus, remove catch for JMException which checkForNotification no longer throws.
NullApplicationRegistry: set small housekeeping check period so that it runs freuqently and tests don't need to sleep for excessive periods of time
AMQQueueAlertTest: remove subsequent send, notification alerts shouldn't depend on queue activity.

merged from trunk r743357

Modified:
    qpid/branches/0.5-fix/qpid/   (props changed)
    qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
    qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    qpid/branches/0.5-fix/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    qpid/branches/0.5-fix/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java

Propchange: qpid/branches/0.5-fix/qpid/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Apr 13 11:56:49 2009
@@ -1 +1 @@
-/qpid/trunk/qpid:742626,743015,743028-743029,743304,743306,743311
+/qpid/trunk/qpid:742626,743015,743028-743029,743304,743306,743311,743357

Modified: qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=764421&r1=764420&r2=764421&view=diff
==============================================================================
--- qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Apr 13 11:56:49 2009
@@ -149,9 +149,11 @@
 
     long clearQueue(StoreContext storeContext) throws AMQException;
 
-
-
-    void removeExpiredIfNoSubscribers() throws AMQException;
+    /**
+     * Checks the status of messages on the queue, purging expired ones, firing age related alerts etc.
+     * @throws AMQException
+     */
+    void checkMessageStatus() throws AMQException;
 
     Set<NotificationCheck> getNotificationChecks();
 

Modified: qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=764421&r1=764420&r2=764421&view=diff
==============================================================================
--- qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Mon Apr 13 11:56:49 2009
@@ -246,7 +246,7 @@
     /**
      * Checks if there is any notification to be send to the listeners
      */
-    public void checkForNotification(AMQMessage msg) throws AMQException, JMException
+    public void checkForNotification(AMQMessage msg) throws AMQException
     {
 
         final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();

Modified: qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=764421&r1=764420&r2=764421&view=diff
==============================================================================
--- qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Mon Apr 13 11:56:49 2009
@@ -423,17 +423,9 @@
             deliverAsync();
         }
 
-        try
-        {
-            _managedObject.checkForNotification(entry.getMessage());
-        }
-        catch (JMException e)
-        {
-            throw new AMQException("Unable to get notification from manage queue: " + e, e);
-        }
-
+        _managedObject.checkForNotification(entry.getMessage());
+        
         return entry;
-
     }
 
     private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
@@ -1431,7 +1423,8 @@
         }
     }
 
-    public void removeExpiredIfNoSubscribers() throws AMQException
+    @Override
+    public void checkMessageStatus() throws AMQException
     {
 
         final StoreContext storeContext = new StoreContext();
@@ -1443,10 +1436,12 @@
             QueueEntry node = queueListIterator.getNode();
             if (!node.isDeleted() && node.expired() && node.acquire())
             {
-
                 node.discard(storeContext);
+            } 
+            else 
+            {
+                _managedObject.checkForNotification(node.getMessage());
             }
-
         }
 
     }

Modified: qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java?rev=764421&r1=764420&r2=764421&view=diff
==============================================================================
--- qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java (original)
+++ qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java Mon Apr 13 11:56:49 2009
@@ -49,7 +49,8 @@
         _logger.info("Initialising NullApplicationRegistry");
         
         _configuration.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore");
-
+        _configuration.addProperty("housekeeping.expiredMessageCheckPeriod", "200");
+        
         Properties users = new Properties();
 
         users.put("guest", "guest");

Modified: qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=764421&r1=764420&r2=764421&view=diff
==============================================================================
--- qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Mon Apr 13 11:56:49 2009
@@ -208,7 +208,7 @@
 
                         try
                         {
-                            q.removeExpiredIfNoSubscribers();
+                            q.checkMessageStatus();
                         }
                         catch (AMQException e)
                         {

Modified: qpid/branches/0.5-fix/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=764421&r1=764420&r2=764421&view=diff
==============================================================================
--- qpid/branches/0.5-fix/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ qpid/branches/0.5-fix/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Mon Apr 13 11:56:49 2009
@@ -167,9 +167,6 @@
         // Ensure message sits on queue long enough to age.
         Thread.sleep(MAX_MESSAGE_AGE * 2);
 
-        sendMessages(1, MAX_MESSAGE_SIZE);
-        assertTrue(_queueMBean.getMessageCount() == 2);
-
         Notification lastNotification = _queueMBean.getLastNotification();
         assertNotNull(lastNotification);
 

Modified: qpid/branches/0.5-fix/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=764421&r1=764420&r2=764421&view=diff
==============================================================================
--- qpid/branches/0.5-fix/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/0.5-fix/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Mon Apr 13 11:56:49 2009
@@ -277,7 +277,8 @@
         return 0;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void removeExpiredIfNoSubscribers() throws AMQException
+    @Override
+    public void checkMessageStatus() throws AMQException
     {
         //To change body of implemented methods use File | Settings | File Templates.
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org