You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2010/05/31 18:07:23 UTC
svn commit: r949786 - in
/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid:
server/configuration/plugin/ server/virtualhost/plugin/
server/virtualhost/plugin/policies/ slowconsumerdetection/policies/
Author: robbie
Date: Mon May 31 16:07:22 2010
New Revision: 949786
URL: http://svn.apache.org/viewvc?rev=949786&view=rev
Log:
QPID-1447: Updated to use newer configuration plugin interface
Applied patch from Andrew Kennedy <an...@gmail.com>
Modified:
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java
Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java?rev=949786&r1=949785&r2=949786&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java Mon May 31 16:07:22 2010
@@ -26,6 +26,8 @@ import org.apache.commons.configuration.
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
+import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.TimeUnit;
public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin
@@ -39,9 +41,9 @@ public class SlowConsumerDetectionConfig
return slowConsumerConfig;
}
- public String[] getParentPaths()
+ public List<String> getParentPaths()
{
- return new String[]{"virtualhosts.virtualhost.slow-consumer-detection"};
+ return Arrays.asList("virtualhosts.virtualhost.slow-consumer-detection");
}
}
@@ -71,7 +73,6 @@ public class SlowConsumerDetectionConfig
String timeUnit = getStringValue("timeunit");
-
if (timeUnit != null)
{
try
@@ -84,7 +85,6 @@ public class SlowConsumerDetectionConfig
}
}
-
System.out.println("Configured SCDC");
System.out.println("Delay:" + getDelay());
System.out.println("TimeUnit:" + getTimeUnit());
Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java?rev=949786&r1=949785&r2=949786&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java Mon May 31 16:07:22 2010
@@ -25,27 +25,27 @@ import org.apache.commons.configuration.
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
+import java.util.Arrays;
+import java.util.List;
+
public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugin
{
-
- public static class SlowConsumerDetectionPolicyConfigurationFactory
- implements ConfigurationPluginFactory
+ public static class SlowConsumerDetectionPolicyConfigurationFactory implements ConfigurationPluginFactory
{
- public ConfigurationPlugin newInstance(String path,
- Configuration config)
- throws ConfigurationException
+ public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException
{
- SlowConsumerDetectionPolicyConfiguration slowConsumerConfig =
- new SlowConsumerDetectionPolicyConfiguration();
+ SlowConsumerDetectionPolicyConfiguration slowConsumerConfig = new SlowConsumerDetectionPolicyConfiguration();
slowConsumerConfig.setConfiguration(path, config);
return slowConsumerConfig;
}
- public String[] getParentPaths()
+ public List<String> getParentPaths()
{
- return new String[]{
+ return Arrays.asList(
"virtualhosts.virtualhost.queues.slow-consumer-detection.policy",
- "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy"};
+ "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy",
+ "virtualhosts.virtualhost.topics.slow-consumer-detection.policy",
+ "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection.policy");
}
}
@@ -67,5 +67,4 @@ public class SlowConsumerDetectionPolicy
throw new ConfigurationException("No Slow consumer policy defined.");
}
}
-
}
Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java?rev=949786&r1=949785&r2=949786&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java Mon May 31 16:07:22 2010
@@ -20,6 +20,11 @@
*/
package org.apache.qpid.server.configuration.plugin;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
@@ -29,9 +34,6 @@ import org.apache.qpid.server.registry.A
import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin;
import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory;
-import java.util.Iterator;
-import java.util.Map;
-
public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin
{
private SlowConsumerPolicyPlugin _policyPlugin;
@@ -45,12 +47,14 @@ public class SlowConsumerDetectionQueueC
return slowConsumerConfig;
}
- public String[] getParentPaths()
+ public List<String> getParentPaths()
{
- return new String[]{"virtualhosts.virtualhost.queues.slow-consumer-detection",
- "virtualhosts.virtualhost.queues.queue.slow-consumer-detection"};
+ return Arrays.asList(
+ "virtualhosts.virtualhost.queues.slow-consumer-detection",
+ "virtualhosts.virtualhost.queues.queue.slow-consumer-detection",
+ "virtualhosts.virtualhost.topics.slow-consumer-detection",
+ "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection");
}
-
}
public String[] getElementsProcessed()
@@ -81,7 +85,7 @@ public class SlowConsumerDetectionQueueC
}
@Override
- public void validateConfiguration() throws ConfigurationException
+ public void validateConfiguration() throws ConfigurationException
{
if (!containsPositiveLong("messageAge") &&
!containsPositiveLong("depth") &&
@@ -91,12 +95,10 @@ public class SlowConsumerDetectionQueueC
"('messageAge','depth' or 'messageCount') must be specified.");
}
- SlowConsumerDetectionPolicyConfiguration policyConfig =
- getConfiguration(SlowConsumerDetectionPolicyConfiguration.class);
+ SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class);
PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager();
- Map<String, SlowConsumerPolicyPluginFactory> factories =
- pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class);
+ Map<String, SlowConsumerPolicyPluginFactory> factories = pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class);
if (policyConfig == null)
{
@@ -122,7 +124,7 @@ public class SlowConsumerDetectionQueueC
_logger.debug("Available factories:" + factories);
}
- SlowConsumerPolicyPluginFactory pluginFactory = factories.get(policyConfig.getPolicyName().toLowerCase());
+ SlowConsumerPolicyPluginFactory<SlowConsumerPolicyPlugin> pluginFactory = factories.get(policyConfig.getPolicyName().toLowerCase());
if (pluginFactory == null)
{
@@ -131,5 +133,4 @@ public class SlowConsumerDetectionQueueC
_policyPlugin = pluginFactory.newInstance(policyConfig);
}
-
}
Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java?rev=949786&r1=949785&r2=949786&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java Mon May 31 16:07:22 2010
@@ -20,39 +20,55 @@
*/
package org.apache.qpid.server.virtualhost.plugin;
-import org.apache.log4j.Logger;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration;
import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostHouseKeepingPlugin;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
-import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin;
-
-import java.util.concurrent.TimeUnit;
class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
{
- Logger _logger = Logger.getLogger(SlowConsumerDetection.class);
private SlowConsumerDetectionConfiguration _config;
public static class SlowConsumerFactory implements VirtualHostPluginFactory
{
- public VirtualHostHouseKeepingPlugin newInstance(VirtualHost vhost)
+ public Class<SlowConsumerDetection> getPluginClass()
+ {
+ return SlowConsumerDetection.class;
+ }
+
+ public String getPluginName()
{
- return new SlowConsumerDetection(vhost);
+ return SlowConsumerDetection.class.getName();
+ }
+
+ public SlowConsumerDetection newInstance(VirtualHost vhost) throws ConfigurationException
+ {
+ SlowConsumerDetection plugin = new SlowConsumerDetection(vhost);
+ plugin.configure(vhost.getConfiguration());
+ return plugin;
}
}
- public SlowConsumerDetection(VirtualHost vhost)
+ public void configure(ConfigurationPlugin config) throws ConfigurationException
{
- super(vhost);
- _config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class);
+ _config = config.getConfiguration(SlowConsumerDetectionConfiguration.class);
+
if (_config == null)
{
throw new IllegalArgumentException("Plugin has not been configured");
}
}
+
+ public SlowConsumerDetection(VirtualHost vhost)
+ {
+ super(vhost);
+ }
@Override
public void execute()
@@ -60,12 +76,11 @@ class SlowConsumerDetection extends Virt
_logger.info("Starting the SlowConsumersDetection job");
for (AMQQueue q : _virtualhost.getQueueRegistry().getQueues())
{
- _logger.debug("Checking consumer status for queue: "
- + q.getName());
+ _logger.debug("Checking consumer status for queue: " + q.getName());
try
{
SlowConsumerDetectionQueueConfiguration config =
- q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class);
+ q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class);
if (checkQueueStatus(q, config))
{
@@ -106,6 +121,7 @@ class SlowConsumerDetection extends Virt
{
if (config != null)
{
+ _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config);
if ((config.getMessageCount() != 0 && q.getMessageCount() >= config.getMessageCount()) ||
(config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) ||
(config.getMessageAge() != 0 && q.getOldestMessageArrivalTime() >= config.getMessageAge()))
@@ -124,4 +140,19 @@ class SlowConsumerDetection extends Virt
}
return false;
}
+
+ public String getPluginName()
+ {
+ return SlowConsumerDetection.class.getName();
+ }
+
+ public boolean isConfigured()
+ {
+ return _config != null && _virtualhost != null;
+ }
+
+ public void configure() throws ConfigurationException
+ {
+ // Empty
+ }
}
Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java?rev=949786&r1=949785&r2=949786&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java Mon May 31 16:07:22 2010
@@ -26,7 +26,9 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.plugins.Plugin;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin;
@@ -39,12 +41,11 @@ public class TopicDeletePolicy implement
public static class TopicDeletePolicyFactory implements SlowConsumerPolicyPluginFactory
{
-
- public SlowConsumerPolicyPlugin newInstance(SlowConsumerDetectionPolicyConfiguration configuration) throws ConfigurationException
+ public TopicDeletePolicy newInstance(ConfigurationPlugin configuration) throws ConfigurationException
{
TopicDeletePolicyConfiguration config =
configuration.getConfiguration(TopicDeletePolicyConfiguration.class);
-
+
return new TopicDeletePolicy(config);
}
@@ -52,6 +53,11 @@ public class TopicDeletePolicy implement
{
return "topicdelete";
}
+
+ public Class<TopicDeletePolicy> getPluginClass()
+ {
+ return TopicDeletePolicy.class;
+ }
}
public TopicDeletePolicy(TopicDeletePolicyConfiguration config)
@@ -121,4 +127,22 @@ public class TopicDeletePolicy implement
return false;
}
+
+ @Override
+ public String getPluginName()
+ {
+ return "topicdelete";
+ }
+
+ @Override
+ public boolean isConfigured()
+ {
+ return _configuration != null;
+ }
+
+ @Override
+ public void configure() throws ConfigurationException
+ {
+ // Empty
+ }
}
Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java?rev=949786&r1=949785&r2=949786&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java Mon May 31 16:07:22 2010
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.virtualhost.plugin.policies;
+import java.util.Arrays;
+import java.util.List;
+
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
@@ -41,11 +44,11 @@ public class TopicDeletePolicyConfigurat
return slowConsumerConfig;
}
- public String[] getParentPaths()
+ public List<String> getParentPaths()
{
- return new String[]{
+ return Arrays.asList(
"virtualhosts.virtualhost.queues.slow-consumer-detection.policy.topicDelete",
- "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy.topicDelete"};
+ "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy.topicDelete");
}
}
Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java?rev=949786&r1=949785&r2=949786&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java Mon May 31 16:07:22 2010
@@ -20,9 +20,10 @@
*/
package org.apache.qpid.slowconsumerdetection.policies;
+import org.apache.qpid.server.plugins.Plugin;
import org.apache.qpid.server.queue.AMQQueue;
-public interface SlowConsumerPolicyPlugin
+public interface SlowConsumerPolicyPlugin extends Plugin
{
public void performPolicy(AMQQueue Queue);
}
Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java?rev=949786&r1=949785&r2=949786&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java Mon May 31 16:07:22 2010
@@ -20,12 +20,8 @@
*/
package org.apache.qpid.slowconsumerdetection.policies;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration;
import org.apache.qpid.server.plugins.PluginFactory;
-public interface SlowConsumerPolicyPluginFactory extends PluginFactory
+public interface SlowConsumerPolicyPluginFactory<P extends SlowConsumerPolicyPlugin> extends PluginFactory<P>
{
-
- public SlowConsumerPolicyPlugin newInstance(SlowConsumerDetectionPolicyConfiguration configuration) throws ConfigurationException;
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org