You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/05/20 17:18:15 UTC

svn commit: r946665 - in /qpid/trunk/qpid/java: broker-plugins/experimental/slowconsumerdetection/ broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/ broker-plugins/experimental/slowconsumerdete...

Author: ritchiem
Date: Thu May 20 15:18:14 2010
New Revision: 946665

URL: http://svn.apache.org/viewvc?rev=946665&view=rev
Log:
QPID-1447 : Full test the TopicDeletePolicy, udpate MockAMQQueue and InternalTestProtocolSession to allow:
- Queue- Setting of AutoDelete
- ProtocolSession - Closing

Added:
    qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/
    qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/
    qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/
    qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/
    qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java
Modified:
    qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml
    qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
    qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
    qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java

Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml?rev=946665&r1=946664&r2=946665&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml (original)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml Thu May 20 15:18:14 2010
@@ -21,7 +21,7 @@ nn - or more contributor license agreeme
 <project name="Slow Consumer Disconnect" default="build">
 
     <property name="module.depends" value="common broker broker-plugins"/>
-    <property name="module.test.depends" value="broker/test systests client management/common"/>
+    <property name="module.test.depends" value="broker/test common/test systests client management/common"/>
     <property name="module.manifest" value="MANIFEST.MF"/>
     <property name="module.plugin" value="true"/>
 

Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java?rev=946665&r1=946664&r2=946665&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java Thu May 20 15:18:14 2010
@@ -25,8 +25,6 @@ import org.apache.commons.configuration.
 import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
 import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
 
-import java.util.List;
-
 public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugin
 {
 
@@ -61,4 +59,14 @@ public class SlowConsumerDetectionPolicy
         return _configuration.getString("name");
     }
 
+    public void setConfiguration(String path, Configuration configuration) throws ConfigurationException
+    {
+        super.setConfiguration(path,configuration);
+
+        if (getPolicyName() == null)
+        {
+            throw new ConfigurationException("No Slow consumer policy defined.");
+        }
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java?rev=946665&r1=946664&r2=946665&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java Thu May 20 15:18:14 2010
@@ -105,8 +105,6 @@ class SlowConsumerDetection extends Virt
     {
         if (config != null)
         {
-
-
             if ((config.getMessageCount() != 0 && q.getMessageCount() >= config.getMessageCount()) ||
                     (config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) ||
                     (config.getMessageAge() != 0 && q.getOldestMessageArrivalTime() >= config.getMessageAge()))

Added: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java?rev=946665&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java Thu May 20 15:18:14 2010
@@ -0,0 +1,248 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.policies;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.exchange.DirectExchange;
+import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class TopicDeletePolicyTest extends TestCase
+{
+
+    TopicDeletePolicyConfiguration _config;
+
+    VirtualHost _defaultVhost;
+    InternalTestProtocolSession _connection;
+
+    public void setUp() throws ConfigurationException, AMQException
+    {
+        _defaultVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getDefaultVirtualHost();
+
+        _connection = new InternalTestProtocolSession(_defaultVhost);
+
+        _config = new TopicDeletePolicyConfiguration();
+
+        XMLConfiguration config = new XMLConfiguration();
+
+        _config.setConfiguration("", config);
+    }
+
+    public void tearDown() throws Exception
+    {
+        try
+        {
+            ApplicationRegistry.remove();
+        }
+        finally
+        {
+            super.tearDown();
+        }
+    }
+
+    private MockAMQQueue createOwnedQueue()
+    {
+        MockAMQQueue queue = new MockAMQQueue("testQueue");
+
+        _defaultVhost.getQueueRegistry().registerQueue(queue);
+
+        try
+        {
+            AMQChannel channel = new AMQChannel(_connection, 0, null);
+            _connection.addChannel(channel);
+
+            queue.setExclusiveOwningSession(channel);
+        }
+        catch (AMQException e)
+        {
+            fail("Unable to create Channel:" + e.getMessage());
+        }
+
+        return queue;
+    }
+
+    private void setQueueToAutoDelete(final AMQQueue queue)
+    {
+        ((MockAMQQueue) queue).setAutoDelete(true);
+
+        queue.setDeleteOnNoConsumers(true);
+        final AMQProtocolSession.Task deleteQueueTask =
+                new AMQProtocolSession.Task()
+                {
+                    public void doTask(AMQProtocolSession session) throws AMQException
+                    {
+                        queue.delete();
+                    }
+                };
+
+        ((AMQChannel) queue.getExclusiveOwningSession()).getProtocolSession().addSessionCloseTask(deleteQueueTask);
+    }
+
+
+
+    /** Check that a null queue passed in does not upset the policy. */
+    public void testNullQueueParameter()
+    {
+        TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+        try
+        {
+            policy.performPolicy(null);
+        }
+        catch (Exception e)
+        {
+            fail("Exception should not be thrown:" + e.getMessage());
+        }
+
+    }
+
+    /**
+     * Set a owning Session to null which means this is not an exclusive queue
+     * so the queue should not be deleted
+     */
+    public void testNonExclusiveQueue()
+    {
+        TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+        MockAMQQueue queue = createOwnedQueue();
+
+        queue.setExclusiveOwningSession(null);
+
+        policy.performPolicy(queue);
+
+        assertFalse("Queue should not be deleted", queue.isDeleted());
+        assertFalse("Connection should not be closed", _connection.isClosed());
+    }
+
+    /**
+     * Test that exclusive JMS Queues are not deleted.
+     * Bind the queue to the direct exchange (so it is a JMS Queue).
+     *
+     * JMS Queues are not to be processed so this should not delete the queue.
+     */
+    public void testQueuesAreNotProcessed()
+    {
+        TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+        MockAMQQueue queue = createOwnedQueue();
+
+        queue.addBinding(new Binding(null, "bindingKey", queue, new DirectExchange(), null));
+
+        policy.performPolicy(queue);
+
+        assertFalse("Queue should not be deleted", queue.isDeleted());
+        assertFalse("Connection should not be closed", _connection.isClosed());
+    }
+
+
+    /**
+     * Give a non auto-delete queue is bound to the topic exchange the
+     * TopicDeletePolicy will close the connection and delete the queue,
+     */
+    public void testNonAutoDeleteTopicIsNotClosed()
+    {
+        TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+        MockAMQQueue queue = createOwnedQueue();
+
+        queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
+
+        queue.setAutoDelete(false);
+
+        policy.performPolicy(queue);
+
+        assertFalse("Queue should not be deleted", queue.isDeleted());
+        assertTrue("Connection should be closed", _connection.isClosed());
+    }
+
+    /**
+     * Give a auto-delete queue bound to the topic exchange the TopicDeletePolicy will
+     * close the connection and delete the queue
+     */
+    public void testTopicIsClosed()
+    {
+        TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+        final MockAMQQueue queue = createOwnedQueue();
+
+        queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
+
+        setQueueToAutoDelete(queue);
+
+        policy.performPolicy(queue);
+
+        assertTrue("Queue should be deleted", queue.isDeleted());
+        assertTrue("Connection should be closed", _connection.isClosed());
+    }
+
+    /**
+     * Give a queue bound to the topic exchange the TopicDeletePolicy will
+     * close the connection and NOT delete the queue
+     */
+    public void testNonAutoDeleteTopicIsClosedNotDeleted() throws AMQException
+    {
+        TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+        MockAMQQueue queue = createOwnedQueue();
+
+        queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
+
+        policy.performPolicy(queue);
+
+        assertFalse("Queue should not be deleted", queue.isDeleted());
+        assertTrue("Connection should be closed", _connection.isClosed());
+    }
+
+    /**
+     * Give a queue bound to the topic exchange the TopicDeletePolicy suitably
+     * configured with the delete-persistent tag will close the connection
+     * and delete the queue
+     */
+    public void testPersistentTopicIsClosedAndDeleted()
+    {
+        _config.getConfig().addProperty("delete-persistent", "");
+
+        TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+        assertTrue("Config was not updated to delete Persistent topics",
+                   _config.deletePersistent());
+
+        MockAMQQueue queue = createOwnedQueue();
+
+        queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
+
+        policy.performPolicy(queue);
+
+        assertTrue("Queue should be deleted", queue.isDeleted());
+        assertTrue("Connection should be closed", _connection.isClosed());
+    }
+
+}

Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java?rev=946665&r1=946664&r2=946665&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java Thu May 20 15:18:14 2010
@@ -73,7 +73,7 @@ public class SlowConsumerTest extends Qp
         setConfigurationProperty("virtualhosts.virtualhost."
                                  + getConnectionURL().getVirtualHost().substring(1) +
                                  ".queues.slow-consumer-detection." +
-                                 "policy[@name]", "TopicDelete");
+                                 "policy.name", "TopicDelete");
 
         setConfigurationProperty("virtualhosts.virtualhost."
                                  + getConnectionURL().getVirtualHost().substring(1) +
@@ -85,7 +85,7 @@ public class SlowConsumerTest extends Qp
          *  Queue Configuration
 
          <slow-consumer-detection>
-         <!-- The depth before which the policy will be applied-->
+             <!-- The depth before which the policy will be applied-->
              <depth>4235264</depth>
 
              <!-- The message age before which the policy will be applied-->
@@ -96,7 +96,7 @@ public class SlowConsumerTest extends Qp
 
              <!-- Policies configuration -->
              <policy>
-                 <name>TopicDelete"</name>
+                 <name>TopicDelete</name>
                  <topicDelete>
                      <delete-persistent/>
                  </topicDelete>
@@ -107,10 +107,10 @@ public class SlowConsumerTest extends Qp
 
         /**
          *  Plugin Configuration
-         *
+
          <slow-consumer-detection>
-         <delay>1</delay>
-         <timeunit>MINUTES</timeunit>
+            <delay>1</delay>
+            <timeunit>MINUTES</timeunit>
          </slow-consumer-detection>
 
          */

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=946665&r1=946664&r2=946665&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Thu May 20 15:18:14 2010
@@ -28,10 +28,13 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.message.MessageContentSource;
 import org.apache.qpid.transport.TestNetworkDriver;
@@ -196,4 +199,15 @@ public class InternalTestProtocolSession
         //  The alternative is to fully implement the TestIOSession to return a CloseFuture from close();
         //  Then the AMQMinaProtocolSession can join on the returning future without a NPE.
     }
+
+    public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+    {
+        super.closeSession(session, cause, message);
+
+        //Simulate the Client responding with a CloseOK
+        // should really update the StateManger but we don't have access here
+        // changeState(AMQState.CONNECTION_CLOSED);
+        ((AMQChannel)session).getProtocolSession().closeSession();
+
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=946665&r1=946664&r2=946665&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Thu May 20 15:18:14 2010
@@ -35,10 +35,12 @@ import org.apache.qpid.server.binding.Bi
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.AMQException;
 
+import javax.swing.*;
 import java.util.List;
 import java.util.Set;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 public class MockAMQQueue implements AMQQueue
 {
@@ -49,6 +51,9 @@ public class MockAMQQueue implements AMQ
     private PrincipalHolder _principalHolder;
 
     private AMQSessionModel _exclusiveOwner;
+    private AMQShortString _owner;
+    private List<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
+    private boolean _autoDelete;
 
     public MockAMQQueue(String name)
     {
@@ -66,17 +71,17 @@ public class MockAMQQueue implements AMQ
 
     public void addBinding(final Binding binding)
     {
-
+        _bindings.add(binding);
     }
 
     public void removeBinding(final Binding binding)
     {
-
+        _bindings.remove(binding);
     }
 
     public List<Binding> getBindings()
     {
-        return null;
+        return _bindings;
     }
 
     public int getBindingCount()
@@ -171,9 +176,15 @@ public class MockAMQQueue implements AMQ
 
     public boolean isAutoDelete()
     {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
+        return _autoDelete;
     }
 
+    public void setAutoDelete(boolean autodelete)
+    {
+        _autoDelete = autodelete;
+    }
+
+
     public AMQShortString getOwner()
     {
         return null;  //To change body of implemented methods use File | Settings | File Templates.
@@ -194,17 +205,6 @@ public class MockAMQQueue implements AMQ
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
-    {
-        //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
-    {
-        //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-
     public void registerSubscription(Subscription subscription, boolean exclusive) throws AMQException
     {
         //To change body of implemented methods use File | Settings | File Templates.
@@ -271,8 +271,9 @@ public class MockAMQQueue implements AMQ
     }
 
     public int delete() throws AMQException
-    {
-       return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    {        
+       _deleted = true;
+       return getMessageCount();
     }
 
     public void enqueue(ServerMessage message) throws AMQException



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org