You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/08/21 15:53:30 UTC
svn commit: r687764 - in /incubator/qpid/trunk/qpid/java/broker/src:
main/java/org/apache/qpid/server/
main/java/org/apache/qpid/server/configuration/
main/java/org/apache/qpid/server/handler/
main/java/org/apache/qpid/server/queue/ test/java/org/apach...
Author: aidan
Date: Thu Aug 21 06:53:28 2008
New Revision: 687764
URL: http://svn.apache.org/viewvc?rev=687764&view=rev
Log:
QPID-1167: reset queue notification lists when creating queues. Pull out defaults centrally.
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=687764&r1=687763&r2=687764&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Thu Aug 21 06:53:28 2008
@@ -183,13 +183,6 @@
_messageStore.createQueue(queue);
}
- Configuration virtualHostDefaultQueueConfiguration =
- VirtualHostConfiguration.getDefaultQueueConfiguration(queue);
- if (virtualHostDefaultQueueConfiguration != null)
- {
- Configurator.configure(queue, virtualHostDefaultQueueConfiguration);
- }
-
_queueRegistry.registerQueue(queue);
}
catch (AMQException ex)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=687764&r1=687763&r2=687764&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Thu Aug 21 06:53:28 2008
@@ -130,13 +130,13 @@
}
}
- public static CompositeConfiguration getDefaultQueueConfiguration(AMQQueue queue)
+ public static CompositeConfiguration getDefaultQueueConfiguration(VirtualHost host)
{
CompositeConfiguration queueConfiguration = null;
if (_config == null)
return null;
- Configuration vHostConfiguration = _config.subset(VIRTUALHOST_PROPERTY_BASE + queue.getVirtualHost().getName());
+ Configuration vHostConfiguration = _config.subset(VIRTUALHOST_PROPERTY_BASE + host.getName());
if (vHostConfiguration == null)
return null;
@@ -193,7 +193,10 @@
queue = AMQQueueFactory.createAMQQueueImpl(queueName,
durable,
owner == null ? null : new AMQShortString(owner) /* These queues will have no owner */,
- autodelete /* Therefore autodelete makes no sence */, virtualHost, arguments);
+ autodelete /* Therefore autodelete makes no sence */,
+ virtualHost,
+ arguments,
+ queueConfiguration);
if (queue.isDurable())
{
@@ -247,10 +250,6 @@
}
}
-
-
-
- Configurator.configure(queue, queueConfiguration);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=687764&r1=687763&r2=687764&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Thu Aug 21 06:53:28 2008
@@ -203,13 +203,7 @@
}
});
}// if exclusive and not durable
-
- Configuration virtualHostDefaultQueueConfiguration = VirtualHostConfiguration.getDefaultQueueConfiguration(queue);
- if (virtualHostDefaultQueueConfiguration != null)
- {
- Configurator.configure(queue, virtualHostDefaultQueueConfiguration);
- }
-
+
return queue;
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=687764&r1=687763&r2=687764&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Aug 21 06:53:28 2008
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.exchange.Exchange;
@@ -207,4 +209,6 @@
{
public void doTask(AMQQueue queue) throws AMQException;
}
+
+ void configure(Configuration virtualHostDefaultQueueConfiguration);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=687764&r1=687763&r2=687764&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Thu Aug 21 06:53:28 2008
@@ -20,8 +20,10 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.commons.configuration.Configuration;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
@@ -31,22 +33,43 @@
public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
public static AMQQueue createAMQQueueImpl(AMQShortString name,
+ boolean durable,
+ AMQShortString owner,
+ boolean autoDelete,
+ VirtualHost virtualHost, final FieldTable arguments)
+
+ throws AMQException
+ {
+
+ return createAMQQueueImpl(name, durable, owner, autoDelete,
+ virtualHost, arguments,
+ VirtualHostConfiguration.getDefaultQueueConfiguration(virtualHost));
+ }
+
+ public static AMQQueue createAMQQueueImpl(AMQShortString name,
boolean durable,
AMQShortString owner,
boolean autoDelete,
- VirtualHost virtualHost, final FieldTable arguments)
+ VirtualHost virtualHost, final FieldTable arguments,
+ Configuration queueConfiguration)
throws AMQException
{
final int priorities = arguments == null ? 1 : arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1;
+ AMQQueue q = null;
if(priorities > 1)
{
- return new AMQPriorityQueue(name, durable, owner, autoDelete, virtualHost, priorities);
+ q = new AMQPriorityQueue(name, durable, owner, autoDelete, virtualHost, priorities);
}
else
{
- return new SimpleAMQQueue(name, durable, owner, autoDelete, virtualHost);
+ q = new SimpleAMQQueue(name, durable, owner, autoDelete, virtualHost);
+ }
+ if (q != null && queueConfiguration != null)
+ {
+ q.configure(queueConfiguration);
}
+ return q;
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=687764&r1=687763&r2=687764&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Aug 21 06:53:28 2008
@@ -3,6 +3,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.MessageStore;
@@ -14,6 +15,7 @@
import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.configuration.Configured;
+import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import javax.management.JMException;
@@ -160,12 +162,17 @@
throw new AMQException("AMQQueue MBean creation has failed ", e);
}
+ resetNotifications();
+
+ }
+
+ private void resetNotifications()
+ {
// This ensure that the notification checks for the configured alerts are created.
setMaximumMessageAge(_maximumMessageAge);
setMaximumMessageCount(_maximumMessageCount);
setMaximumMessageSize(_maximumMessageSize);
setMaximumQueueDepth(_maximumQueueDepth);
-
}
// ------ Getters and Setters
@@ -1635,4 +1642,10 @@
}
return ids;
}
+
+ public void configure(Configuration queueConfiguration)
+ {
+ Configurator.configure(this, queueConfiguration);
+ resetNotifications();
+ }
}
\ No newline at end of file
Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=687764&r1=687763&r2=687764&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Thu Aug 21 06:53:28 2008
@@ -40,6 +40,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.mina.common.ByteBuffer;
import javax.management.Notification;
@@ -47,6 +48,7 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Collections;
+import java.util.Set;
/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
public class AMQQueueAlertTest extends TestCase
@@ -251,6 +253,26 @@
assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
}
+ public void testAlertConfiguration() throws AMQException
+ {
+ // Setup configuration
+ CompositeConfiguration config = new CompositeConfiguration();
+ config.setProperty("maximumMessageSize", new Long(23));
+ config.setProperty("maximumMessageCount", new Long(24));
+ config.setProperty("maximumQueueDepth", new Long(25));
+ config.setProperty("maximumMessageAge", new Long(26));
+
+ // Create queue and set config
+ _queue = getNewQueue();
+ _queue.configure(config);
+
+ // Check alerts and notifications
+ Set<NotificationCheck> checks = _queue.getNotificationChecks();
+ assertNotNull("No checks found", checks);
+ assertFalse("Checks should not be empty", checks.isEmpty());
+ assertEquals("Wrong number of checks", 4, checks.size());
+ }
+
protected IncomingMessage message(final boolean immediate, long size) throws AMQException
{
MessagePublishInfo publish = new MessagePublishInfo()