You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/05/07 17:11:57 UTC
svn commit: r942108 - in /qpid/trunk/qpid/java:
broker-plugins/experimental/SlowConsumerDisconnect/
broker-plugins/experimental/SlowConsumerDisconnect/src/main/
broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/
broker-plugins/experiment...
Author: ritchiem
Date: Fri May 7 15:11:56 2010
New Revision: 942108
URL: http://svn.apache.org/viewvc?rev=942108&view=rev
Log:
QPID-1447 : Exclude SCD testing until complete
Added:
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java
Modified:
qpid/trunk/qpid/java/test-profiles/Excludes
Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF Fri May 7 15:11:56 2010
@@ -0,0 +1,22 @@
+Manifest-Version: 1.0
+Bundle-ManifestVersion: 2
+Bundle-Name: Qpid Slow Consumer Detection
+Bundle-SymbolicName: qpid_slow_consumer_detection;singleton:=true
+Bundle-Version: 1.0.0
+Bundle-Activator: org.apache.qpid.server.virtualhost.plugins.Activator
+Import-Package: org.osgi.framework,
+ org.apache.qpid.server.configuration.plugin,
+ org.apache.qpid.server.configuration,
+ org.apache.qpid.server.virtualhost.plugin,
+ org.apache.qpid.server.virtualhost,
+ org.apache.qpid.server.queue,
+ org.apache.qpid.server.registry,
+ org.apache.qpid.server.plugins,
+ org.apache.qpid,
+ org.apache.log4j,
+ org.apache.commons.configuration
+Bundle-RequiredExecutionEnvironment: JavaSE-1.6
+Bundle-ClassPath: .
+Bundle-ActivationPolicy: lazy
+Export-Package: org.apache.qpid.server.virtualhost.plugins;uses:="org.osgi.framework"
+
Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml Fri May 7 15:11:56 2010
@@ -0,0 +1,32 @@
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+nn - or more contributor license agreements. See the NOTICE file
+ -n distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<project name="Slow Consumer Disconnect" default="build">
+
+ <property name="module.depends" value="common broker broker-plugins"/>
+ <property name="module.test.depends" value="broker/test systests client management/common"/>
+ <property name="module.manifest" value="MANIFEST.MF"/>
+ <property name="module.plugin" value="true"/>
+
+ <import file="../../../module.xml"/>
+
+ <target name="bundle" depends="bundle-tasks"/>
+
+</project>
Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java Fri May 7 15:11:56 2010
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory;
+import org.apache.qpid.server.virtualhost.plugin.VirtualHostPluginFactory;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+
+/**
+ * Activator that loads our OSGi bundles for the Slow Consumer Detection plugin.
+ *
+ * This includes Configuration
+ *
+ * @author ritchiem
+ */
+public class Activator implements BundleActivator
+{
+ public void start(BundleContext ctx) throws Exception
+ {
+ if (null != ctx)
+ {
+ ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory(), null);
+ ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory(), null);
+ ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory(), null);
+ ctx.registerService(VirtualHostPluginFactory.class.getName(), new SlowConsumerDetection.SlowConsumerFactory(), null);
+ ctx.registerService(SlowConsumerPolicyPluginFactory.class.getName(), new TopicDeletePolicy.DeletePolicyFactory(), null);
+ }
+ }
+
+ public void stop(BundleContext ctx) throws Exception
+ {
+ // no need to do anything here, osgi will unregister the service for us
+ }
+
+}
Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java Fri May 7 15:11:56 2010
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.plugin.VirtualHostPlugin;
+import org.apache.qpid.server.virtualhost.plugin.VirtualHostPluginFactory;
+
+class SlowConsumerDetection implements VirtualHostPlugin
+{
+ Logger _logger = Logger.getLogger(SlowConsumerDetection.class);
+ private VirtualHost _virtualhost;
+ private SlowConsumerDetectionConfiguration _config;
+ private SlowConsumerPolicyPlugin _policy;
+
+ public static class SlowConsumerFactory implements VirtualHostPluginFactory
+ {
+ public VirtualHostPlugin newInstance(VirtualHost vhost)
+ {
+ return new SlowConsumerDetection(vhost);
+ }
+ }
+
+ public SlowConsumerDetection(VirtualHost vhost)
+ {
+ _virtualhost = vhost;
+ _config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class);
+ if (_config == null)
+ {
+ throw new IllegalArgumentException("Plugin has not been configured");
+ }
+
+ }
+
+ public void run()
+ {
+ _logger.info("Starting the SlowConsumersDetection job");
+ try
+ {
+ for (AMQQueue q : _virtualhost.getQueueRegistry().getQueues())
+ {
+ _logger.debug("Checking consumer status for queue: "
+ + q.getName());
+ try
+ {
+ SlowConsumerDetectionQueueConfiguration config =
+ q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class);
+
+ if (checkQueueStatus(q, config))
+ {
+ config.getPolicy().performPolicy(q);
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Exception in SlowConsumersDetection " +
+ "for queue: " +
+ q.getNameShortString().toString(), e);
+ //Don't throw exceptions as this will stop the
+ // house keeping task from running.
+ }
+ }
+ _logger.info("SlowConsumersDetection job completed.");
+ }
+ catch (Exception e)
+ {
+ _logger.error("SlowConsumersDetection job failed: " + e.getMessage(), e);
+ }
+ catch (Error e)
+ {
+ _logger.error("SlowConsumersDetection job failed with error: " + e.getMessage(), e);
+ }
+ }
+
+ public long getDelay()
+ {
+ return _config.getDelay();
+ }
+
+ public String getTimeUnit()
+ {
+ return _config.getTimeUnit();
+ }
+
+ /**
+ * Check the depth,messageSize,messageAge,messageCount values for this q
+ *
+ * @param q the queue to check
+ * @param config
+ *
+ * @return true if the queue has reached a threshold.
+ */
+ private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config)
+ {
+
+ _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config);
+
+ return config != null &&
+ (q.getMessageCount() >= config.getMessageCount() ||
+ q.getQueueDepth() >= config.getDepth() ||
+ q.getOldestMessageArrivalTime() >= config.getMessageAge());
+ }
+}
Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java Fri May 7 15:11:56 2010
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin
+{
+ public static class SlowConsumerDetectionConfigurationFactory implements ConfigurationPluginFactory
+ {
+ public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException
+ {
+ SlowConsumerDetectionConfiguration slowConsumerConfig = new SlowConsumerDetectionConfiguration();
+ slowConsumerConfig.setConfiguration(path, config);
+ return slowConsumerConfig;
+ }
+
+ public String[] getParentPaths()
+ {
+ return new String[]{"virtualhosts.virtualhost.slow-consumer-detection"};
+ }
+ }
+
+ public String[] getElementsProcessed()
+ {
+ return new String[]{"delay",
+ "timeunit"};
+ }
+
+ public long getDelay()
+ {
+ return _configuration.getLong("delay", 10);
+ }
+
+ public String getTimeUnit()
+ {
+ return _configuration.getString("timeunit", TimeUnit.SECONDS.toString());
+ }
+
+ @Override
+ public void setConfiguration(String path, Configuration configuration) throws ConfigurationException
+ {
+ super.setConfiguration(path, configuration);
+
+ System.out.println("Configured SCDC");
+ System.out.println("Delay:" + getDelay());
+ System.out.println("TimeUnit:" + getTimeUnit());
+ }
+}
Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java Fri May 7 15:11:56 2010
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory;
+
+import java.util.List;
+
+public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugin
+{
+
+ public static class SlowConsumerDetectionPolicyConfigurationFactory
+ implements ConfigurationPluginFactory
+ {
+ public ConfigurationPlugin newInstance(String path,
+ Configuration config)
+ throws ConfigurationException
+ {
+ SlowConsumerDetectionPolicyConfiguration slowConsumerConfig =
+ new SlowConsumerDetectionPolicyConfiguration();
+ slowConsumerConfig.setConfiguration(path, config);
+ return slowConsumerConfig;
+ }
+
+ public String[] getParentPaths()
+ {
+ return new String[]{
+ "virtualhosts.virtualhost.queues.slow-consumer-detection.policy",
+ "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy",
+ "virtualhosts.virtualhost.topics.slow-consumer-detection.policy",
+ "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection.policy"};
+ }
+ }
+
+ public String[] getElementsProcessed()
+ {
+ // NOTE: the use of '@name]' rather than '[@name]' this appears to be
+ // a bug in commons configuration.
+ //fixme - Simple test case needs raised and JIRA raised on Commons
+ return new String[]{"@name]", "options"};
+ }
+
+ public String getPolicyName()
+ {
+ return _configuration.getString("[@name]");
+ }
+
+ public String getOption(String option)
+ {
+ List options = _configuration.getList("options.option[@name]");
+
+ if (options != null && options.contains(option))
+ {
+ return _configuration.getString("options.option[@value]" +
+ "(" + options.indexOf(option) + ")");
+ }
+
+ return null;
+ }
+}
Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java Fri May 7 15:11:56 2010
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory;
+import org.apache.qpid.server.plugins.PluginManager;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import java.util.Map;
+
+public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin
+{
+ private SlowConsumerPolicyPlugin _policyPlugin;
+
+ public static class SlowConsumerDetectionQueueConfigurationFactory implements ConfigurationPluginFactory
+ {
+ public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException
+ {
+ SlowConsumerDetectionQueueConfiguration slowConsumerConfig = new SlowConsumerDetectionQueueConfiguration();
+ slowConsumerConfig.setConfiguration(path, config);
+ return slowConsumerConfig;
+ }
+
+ public String[] getParentPaths()
+ {
+ return new String[]{"virtualhosts.virtualhost.queues.slow-consumer-detection",
+ "virtualhosts.virtualhost.queues.queue.slow-consumer-detection",
+ "virtualhosts.virtualhost.topics.slow-consumer-detection",
+ "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection"};
+ }
+
+ }
+
+ public String[] getElementsProcessed()
+ {
+ return new String[]{"messageAge",
+ "depth",
+ "messageCount"};
+ }
+
+ public int getMessageAge()
+ {
+ return (int) getConfigurationValue("messageAge");
+ }
+
+ public long getDepth()
+ {
+ return getConfigurationValue("depth");
+ }
+
+ public long getMessageCount()
+ {
+ return getConfigurationValue("messageCount");
+ }
+
+ public SlowConsumerPolicyPlugin getPolicy()
+ {
+ return _policyPlugin;
+ }
+
+ @Override
+ public void setConfiguration(String path, Configuration configuration) throws ConfigurationException
+ {
+ super.setConfiguration(path, configuration);
+
+ SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class);
+
+ PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager();
+ Map<String, SlowConsumerPolicyPluginFactory> factories =
+ pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Configured SCDQC");
+ _logger.debug("Age:" + getMessageAge());
+ _logger.debug("Depth:" + getDepth());
+ _logger.debug("Count:" + getMessageCount());
+ _logger.debug("Policy:" + policyConfig.getPolicyName());
+ _logger.debug("Available factories:" + factories);
+ }
+
+ SlowConsumerPolicyPluginFactory pluginFactory = factories.get(policyConfig.getPolicyName().toLowerCase());
+
+ if (pluginFactory == null)
+ {
+ throw new ConfigurationException("Unknown Slow Consumer Policy specified:" + policyConfig.getPolicyName() + " Known Policies:" + factories.keySet());
+ }
+
+ _policyPlugin = pluginFactory.newInstance(policyConfig);
+ }
+
+ private long getConfigurationValue(String property)
+ {
+ // The _configuration we are given is a munged configurated
+ // so the queue will already have queue-queues munging
+
+ // we then need to ensure that the TopicsConfiguration
+ // and TopicConfiguration classes correctly munge their configuration:
+ // queue-queues -> topic-topics
+
+ return _configuration.getLong(property, 0);
+ }
+
+}
Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java Fri May 7 15:11:56 2010
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.qpid.server.queue.AMQQueue;
+
+public interface SlowConsumerPolicyPlugin
+{
+ public void performPolicy(AMQQueue Queue);
+}
Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java Fri May 7 15:11:56 2010
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.qpid.server.plugins.PluginFactory;
+
+public interface SlowConsumerPolicyPluginFactory extends PluginFactory
+{
+
+ public SlowConsumerPolicyPlugin newInstance(SlowConsumerDetectionPolicyConfiguration configuration);
+}
Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java Fri May 7 15:11:56 2010
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class TopicDeletePolicy implements SlowConsumerPolicyPlugin
+{
+ Logger _logger = Logger.getLogger(TopicDeletePolicy.class);
+ private SlowConsumerDetectionPolicyConfiguration _configuration;
+
+ public static class DeletePolicyFactory implements SlowConsumerPolicyPluginFactory
+ {
+
+ public SlowConsumerPolicyPlugin newInstance(SlowConsumerDetectionPolicyConfiguration configuration)
+ {
+ return new TopicDeletePolicy(configuration);
+ }
+
+ public String getPluginName()
+ {
+ return "topicdelete";
+ }
+ }
+
+ public TopicDeletePolicy(SlowConsumerDetectionPolicyConfiguration config)
+ {
+ _configuration = config;
+ }
+
+ public void performPolicy(AMQQueue q)
+ {
+ AMQSessionModel owner = q.getExclusiveOwningSession();
+
+ // Only process exclusive queues
+ if (owner == null)
+ {
+ return;
+ }
+
+ //Only process Topics
+ if(!validateQueueIsATopic(q))
+ {
+ return;
+ }
+
+ try
+ {
+ owner.getConnectionModel().
+ closeSession(owner, AMQConstant.RESOURCE_ERROR,
+ "Consuming to slow.");
+
+ String option = _configuration.getOption("delete-persistent");
+
+ boolean deletePersistent = option != null && Boolean.parseBoolean(option);
+
+ if (!q.isAutoDelete() && deletePersistent)
+ {
+ q.delete();
+ }
+
+ }
+ catch (AMQException e)
+ {
+ _logger.warn("Unable to close consumer:" + owner + ", on queue:" + q.getName());
+ }
+
+ }
+
+ /**
+ * Check the queue bindings to validate the queue is bound to the
+ * topic exchange.
+ *
+ * @param q the Queue
+ * @return true iff Q is bound to a TopicExchange
+ */
+ private boolean validateQueueIsATopic(AMQQueue q)
+ {
+ for (Binding binding : q.getBindings())
+ {
+ if (binding.getExchange() instanceof TopicExchange)
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
Modified: qpid/trunk/qpid/java/test-profiles/Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Excludes?rev=942108&r1=942107&r2=942108&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Excludes Fri May 7 15:11:56 2010
@@ -31,3 +31,6 @@ org.apache.qpid.test.unit.ack.Acknowledg
// QPID-2418 : The queue backing the dur sub is not currently deleted at subscription change, so the test will fail.
org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart
+
+// QPID-1447 : Work In Progress
+org.apache.qpid.systest.SlowConsumerTest#*
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org