You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/05/07 17:11:57 UTC

svn commit: r942108 - in /qpid/trunk/qpid/java: broker-plugins/experimental/SlowConsumerDisconnect/ broker-plugins/experimental/SlowConsumerDisconnect/src/main/ broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/ broker-plugins/experiment...

Author: ritchiem
Date: Fri May  7 15:11:56 2010
New Revision: 942108

URL: http://svn.apache.org/viewvc?rev=942108&view=rev
Log:
QPID-1447 : Exclude SCD testing until complete

Added:
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java
Modified:
    qpid/trunk/qpid/java/test-profiles/Excludes

Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF Fri May  7 15:11:56 2010
@@ -0,0 +1,22 @@
+Manifest-Version: 1.0
+Bundle-ManifestVersion: 2
+Bundle-Name: Qpid Slow Consumer Detection
+Bundle-SymbolicName: qpid_slow_consumer_detection;singleton:=true
+Bundle-Version: 1.0.0
+Bundle-Activator: org.apache.qpid.server.virtualhost.plugins.Activator
+Import-Package: org.osgi.framework,
+ org.apache.qpid.server.configuration.plugin,
+ org.apache.qpid.server.configuration,
+ org.apache.qpid.server.virtualhost.plugin,
+ org.apache.qpid.server.virtualhost,
+ org.apache.qpid.server.queue,
+ org.apache.qpid.server.registry,
+ org.apache.qpid.server.plugins,
+ org.apache.qpid,
+ org.apache.log4j,
+ org.apache.commons.configuration 
+Bundle-RequiredExecutionEnvironment: JavaSE-1.6
+Bundle-ClassPath: .
+Bundle-ActivationPolicy: lazy
+Export-Package: org.apache.qpid.server.virtualhost.plugins;uses:="org.osgi.framework"
+

Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml Fri May  7 15:11:56 2010
@@ -0,0 +1,32 @@
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+nn - or more contributor license agreements.  See the NOTICE file
+ -n distributed with this work for additional information
+ - regarding copyright ownership.  The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied.  See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<project name="Slow Consumer Disconnect" default="build">
+
+    <property name="module.depends" value="common broker broker-plugins"/>
+    <property name="module.test.depends" value="broker/test systests client management/common"/>
+    <property name="module.manifest" value="MANIFEST.MF"/>
+    <property name="module.plugin" value="true"/>
+
+    <import file="../../../module.xml"/>
+
+    <target name="bundle" depends="bundle-tasks"/>
+
+</project>

Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java Fri May  7 15:11:56 2010
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory;
+import org.apache.qpid.server.virtualhost.plugin.VirtualHostPluginFactory;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+
+/**
+ * Activator that loads our OSGi bundles for the Slow Consumer Detection plugin.
+ *
+ * This includes Configuration 
+ *
+ * @author ritchiem
+ */
+public class Activator implements BundleActivator
+{
+    public void start(BundleContext ctx) throws Exception
+    {
+        if (null != ctx)
+        {
+            ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory(), null);
+            ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory(), null);
+            ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory(), null);
+            ctx.registerService(VirtualHostPluginFactory.class.getName(), new SlowConsumerDetection.SlowConsumerFactory(), null);
+            ctx.registerService(SlowConsumerPolicyPluginFactory.class.getName(), new TopicDeletePolicy.DeletePolicyFactory(), null);
+        }
+    }
+
+    public void stop(BundleContext ctx) throws Exception
+    {
+        // no need to do anything here, osgi will unregister the service for us
+    }
+
+}

Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java Fri May  7 15:11:56 2010
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.plugin.VirtualHostPlugin;
+import org.apache.qpid.server.virtualhost.plugin.VirtualHostPluginFactory;
+
+class SlowConsumerDetection implements VirtualHostPlugin
+{
+    Logger _logger = Logger.getLogger(SlowConsumerDetection.class);
+    private VirtualHost _virtualhost;
+    private SlowConsumerDetectionConfiguration _config;
+    private SlowConsumerPolicyPlugin _policy;
+
+    public static class SlowConsumerFactory implements VirtualHostPluginFactory
+    {
+        public VirtualHostPlugin newInstance(VirtualHost vhost)
+        {
+            return new SlowConsumerDetection(vhost);
+        }
+    }
+
+    public SlowConsumerDetection(VirtualHost vhost)
+    {
+        _virtualhost = vhost;
+        _config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class);
+        if (_config == null)
+        {
+            throw new IllegalArgumentException("Plugin has not been configured");
+        }
+
+    }
+
+    public void run()
+    {
+        _logger.info("Starting the SlowConsumersDetection job");
+        try
+        {
+            for (AMQQueue q : _virtualhost.getQueueRegistry().getQueues())
+            {
+                _logger.debug("Checking consumer status for queue: "
+                              + q.getName());
+                try
+                {
+                    SlowConsumerDetectionQueueConfiguration config =
+                            q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class);
+
+                    if (checkQueueStatus(q, config))
+                    {
+                        config.getPolicy().performPolicy(q);
+                    }
+                }
+                catch (Exception e)
+                {
+                    _logger.error("Exception in SlowConsumersDetection " +
+                                  "for queue: " +
+                                  q.getNameShortString().toString(), e);
+                    //Don't throw exceptions as this will stop the
+                    // house keeping task from running.
+                }
+            }
+            _logger.info("SlowConsumersDetection job completed.");
+        }
+        catch (Exception e)
+        {
+            _logger.error("SlowConsumersDetection job failed: " + e.getMessage(), e);
+        }
+        catch (Error e)
+        {
+            _logger.error("SlowConsumersDetection job failed with error: " + e.getMessage(), e);
+        }
+    }
+
+    public long getDelay()
+    {
+        return _config.getDelay();
+    }
+
+    public String getTimeUnit()
+    {
+        return _config.getTimeUnit();
+    }
+
+    /**
+     * Check the depth,messageSize,messageAge,messageCount values for this q
+     *
+     * @param q      the queue to check
+     * @param config
+     *
+     * @return true if the queue has reached a threshold.
+     */
+    private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config)
+    {
+
+        _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config);
+
+        return config != null &&
+               (q.getMessageCount() >= config.getMessageCount() ||
+                q.getQueueDepth() >= config.getDepth() ||
+                q.getOldestMessageArrivalTime() >= config.getMessageAge());
+    }
+}

Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java Fri May  7 15:11:56 2010
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin
+{
+    public static class SlowConsumerDetectionConfigurationFactory implements ConfigurationPluginFactory
+    {
+        public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException
+        {
+            SlowConsumerDetectionConfiguration slowConsumerConfig = new SlowConsumerDetectionConfiguration();
+            slowConsumerConfig.setConfiguration(path, config);
+            return slowConsumerConfig;
+        }
+
+        public String[] getParentPaths()
+        {
+            return new String[]{"virtualhosts.virtualhost.slow-consumer-detection"};
+        }
+    }
+
+    public String[] getElementsProcessed()
+    {
+        return new String[]{"delay",
+                            "timeunit"};
+    }
+
+    public long getDelay()
+    {
+        return _configuration.getLong("delay", 10);
+    }
+
+    public String getTimeUnit()
+    {
+        return _configuration.getString("timeunit", TimeUnit.SECONDS.toString());
+    }
+
+    @Override
+    public void setConfiguration(String path, Configuration configuration) throws ConfigurationException
+    {
+        super.setConfiguration(path, configuration);
+
+        System.out.println("Configured SCDC");
+        System.out.println("Delay:" + getDelay());
+        System.out.println("TimeUnit:" + getTimeUnit());
+    }
+}

Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java Fri May  7 15:11:56 2010
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory;
+
+import java.util.List;
+
+public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugin
+{
+
+    public static class SlowConsumerDetectionPolicyConfigurationFactory
+            implements ConfigurationPluginFactory
+    {
+        public ConfigurationPlugin newInstance(String path,
+                                               Configuration config)
+                throws ConfigurationException
+        {
+            SlowConsumerDetectionPolicyConfiguration slowConsumerConfig =
+                    new SlowConsumerDetectionPolicyConfiguration();
+            slowConsumerConfig.setConfiguration(path, config);
+            return slowConsumerConfig;
+        }
+
+        public String[] getParentPaths()
+        {
+            return new String[]{
+                    "virtualhosts.virtualhost.queues.slow-consumer-detection.policy",
+                    "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy",
+                    "virtualhosts.virtualhost.topics.slow-consumer-detection.policy",
+                    "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection.policy"};
+        }
+    }
+
+    public String[] getElementsProcessed()
+    {
+        // NOTE: the use of '@name]' rather than '[@name]' this appears to be
+        // a bug in commons configuration.
+        //fixme - Simple test case needs raised and JIRA raised on Commons
+        return new String[]{"@name]", "options"};
+    }
+
+    public String getPolicyName()
+    {
+        return _configuration.getString("[@name]");
+    }
+
+    public String getOption(String option)
+    {
+        List options = _configuration.getList("options.option[@name]");
+
+        if (options != null && options.contains(option))
+        {
+            return _configuration.getString("options.option[@value]" +
+                                            "(" + options.indexOf(option) + ")");
+        }
+
+        return null;
+    }
+}

Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java Fri May  7 15:11:56 2010
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory;
+import org.apache.qpid.server.plugins.PluginManager;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import java.util.Map;
+
+public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin
+{
+    private SlowConsumerPolicyPlugin _policyPlugin;
+
+    public static class SlowConsumerDetectionQueueConfigurationFactory implements ConfigurationPluginFactory
+    {
+        public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException
+        {
+            SlowConsumerDetectionQueueConfiguration slowConsumerConfig = new SlowConsumerDetectionQueueConfiguration();
+            slowConsumerConfig.setConfiguration(path, config);
+            return slowConsumerConfig;
+        }
+
+        public String[] getParentPaths()
+        {
+            return new String[]{"virtualhosts.virtualhost.queues.slow-consumer-detection",
+                                "virtualhosts.virtualhost.queues.queue.slow-consumer-detection",
+                                "virtualhosts.virtualhost.topics.slow-consumer-detection",
+                                "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection"};
+        }
+
+    }
+
+    public String[] getElementsProcessed()
+    {
+        return new String[]{"messageAge",
+                            "depth",
+                            "messageCount"};
+    }
+
+    public int getMessageAge()
+    {
+        return (int) getConfigurationValue("messageAge");
+    }
+
+    public long getDepth()
+    {
+        return getConfigurationValue("depth");
+    }
+
+    public long getMessageCount()
+    {
+        return getConfigurationValue("messageCount");
+    }
+
+    public SlowConsumerPolicyPlugin getPolicy()
+    {
+        return _policyPlugin;
+    }
+
+    @Override
+    public void setConfiguration(String path, Configuration configuration) throws ConfigurationException
+    {
+        super.setConfiguration(path, configuration);
+
+        SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class);
+
+        PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager();
+        Map<String, SlowConsumerPolicyPluginFactory> factories =
+                pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class);
+
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Configured SCDQC");
+            _logger.debug("Age:" + getMessageAge());
+            _logger.debug("Depth:" + getDepth());
+            _logger.debug("Count:" + getMessageCount());
+            _logger.debug("Policy:" + policyConfig.getPolicyName());
+            _logger.debug("Available factories:" + factories);
+        }
+
+        SlowConsumerPolicyPluginFactory pluginFactory = factories.get(policyConfig.getPolicyName().toLowerCase());
+
+        if (pluginFactory == null)
+        {
+            throw new ConfigurationException("Unknown Slow Consumer Policy specified:" + policyConfig.getPolicyName() + " Known Policies:" + factories.keySet());
+        }
+
+        _policyPlugin = pluginFactory.newInstance(policyConfig);
+    }
+
+    private long getConfigurationValue(String property)
+    {
+        // The _configuration we are given is a munged configurated
+        // so the queue will already have queue-queues munging
+
+        // we then need to ensure that the TopicsConfiguration
+        // and TopicConfiguration classes correctly munge their configuration:
+        // queue-queues -> topic-topics
+
+        return _configuration.getLong(property, 0);
+    }
+
+}

Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java Fri May  7 15:11:56 2010
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.qpid.server.queue.AMQQueue;
+
+public interface SlowConsumerPolicyPlugin
+{
+    public void performPolicy(AMQQueue Queue);
+}

Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java Fri May  7 15:11:56 2010
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.qpid.server.plugins.PluginFactory;
+
+public interface SlowConsumerPolicyPluginFactory extends PluginFactory
+{
+
+    public SlowConsumerPolicyPlugin newInstance(SlowConsumerDetectionPolicyConfiguration configuration);
+}

Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java?rev=942108&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java Fri May  7 15:11:56 2010
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class TopicDeletePolicy implements SlowConsumerPolicyPlugin
+{
+    Logger _logger = Logger.getLogger(TopicDeletePolicy.class);
+    private SlowConsumerDetectionPolicyConfiguration _configuration;
+
+    public static class DeletePolicyFactory implements SlowConsumerPolicyPluginFactory
+    {
+
+        public SlowConsumerPolicyPlugin newInstance(SlowConsumerDetectionPolicyConfiguration configuration)
+        {
+            return new TopicDeletePolicy(configuration);
+        }
+
+        public String getPluginName()
+        {
+            return "topicdelete";
+        }
+    }
+
+    public TopicDeletePolicy(SlowConsumerDetectionPolicyConfiguration config)
+    {
+        _configuration = config;
+    }
+
+    public void performPolicy(AMQQueue q)
+    {
+        AMQSessionModel owner = q.getExclusiveOwningSession();
+
+        // Only process exclusive queues
+        if (owner == null)
+        {
+            return;
+        }
+
+        //Only process Topics
+        if(!validateQueueIsATopic(q))
+        {
+            return;
+        }
+
+        try
+        {
+            owner.getConnectionModel().
+                    closeSession(owner, AMQConstant.RESOURCE_ERROR,
+                                 "Consuming to slow.");
+
+            String option = _configuration.getOption("delete-persistent");
+
+            boolean deletePersistent = option != null && Boolean.parseBoolean(option);
+
+            if (!q.isAutoDelete() && deletePersistent)
+            {
+                q.delete();
+            }
+
+        }
+        catch (AMQException e)
+        {
+            _logger.warn("Unable to close consumer:" + owner + ", on queue:" + q.getName());
+        }
+
+    }
+
+    /**
+     * Check the queue bindings to validate the queue is bound to the
+     * topic exchange.
+     *
+     * @param q the Queue
+     * @return true iff Q is bound to a TopicExchange
+     */
+    private boolean validateQueueIsATopic(AMQQueue q)
+    {
+        for (Binding binding : q.getBindings())
+        {
+            if (binding.getExchange() instanceof TopicExchange)
+            {
+                return true;
+            }
+        }
+
+        return false;
+    }
+}

Modified: qpid/trunk/qpid/java/test-profiles/Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Excludes?rev=942108&r1=942107&r2=942108&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Excludes Fri May  7 15:11:56 2010
@@ -31,3 +31,6 @@ org.apache.qpid.test.unit.ack.Acknowledg
 
 // QPID-2418 : The queue backing the dur sub is not currently deleted at subscription change, so the test will fail.
 org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart
+
+// QPID-1447 : Work In Progress
+org.apache.qpid.systest.SlowConsumerTest#*



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org