You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/05/20 17:18:15 UTC
svn commit: r946665 - in /qpid/trunk/qpid/java:
broker-plugins/experimental/slowconsumerdetection/
broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/
broker-plugins/experimental/slowconsumerdete...
Author: ritchiem
Date: Thu May 20 15:18:14 2010
New Revision: 946665
URL: http://svn.apache.org/viewvc?rev=946665&view=rev
Log:
QPID-1447 : Full test the TopicDeletePolicy, udpate MockAMQQueue and InternalTestProtocolSession to allow:
- Queue- Setting of AutoDelete
- ProtocolSession - Closing
Added:
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java
Modified:
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml?rev=946665&r1=946664&r2=946665&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml (original)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml Thu May 20 15:18:14 2010
@@ -21,7 +21,7 @@ nn - or more contributor license agreeme
<project name="Slow Consumer Disconnect" default="build">
<property name="module.depends" value="common broker broker-plugins"/>
- <property name="module.test.depends" value="broker/test systests client management/common"/>
+ <property name="module.test.depends" value="broker/test common/test systests client management/common"/>
<property name="module.manifest" value="MANIFEST.MF"/>
<property name="module.plugin" value="true"/>
Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java?rev=946665&r1=946664&r2=946665&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java Thu May 20 15:18:14 2010
@@ -25,8 +25,6 @@ import org.apache.commons.configuration.
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
-import java.util.List;
-
public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugin
{
@@ -61,4 +59,14 @@ public class SlowConsumerDetectionPolicy
return _configuration.getString("name");
}
+ public void setConfiguration(String path, Configuration configuration) throws ConfigurationException
+ {
+ super.setConfiguration(path,configuration);
+
+ if (getPolicyName() == null)
+ {
+ throw new ConfigurationException("No Slow consumer policy defined.");
+ }
+ }
+
}
Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java?rev=946665&r1=946664&r2=946665&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java Thu May 20 15:18:14 2010
@@ -105,8 +105,6 @@ class SlowConsumerDetection extends Virt
{
if (config != null)
{
-
-
if ((config.getMessageCount() != 0 && q.getMessageCount() >= config.getMessageCount()) ||
(config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) ||
(config.getMessageAge() != 0 && q.getOldestMessageArrivalTime() >= config.getMessageAge()))
Added: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java?rev=946665&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java Thu May 20 15:18:14 2010
@@ -0,0 +1,248 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.policies;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.exchange.DirectExchange;
+import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class TopicDeletePolicyTest extends TestCase
+{
+
+ TopicDeletePolicyConfiguration _config;
+
+ VirtualHost _defaultVhost;
+ InternalTestProtocolSession _connection;
+
+ public void setUp() throws ConfigurationException, AMQException
+ {
+ _defaultVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getDefaultVirtualHost();
+
+ _connection = new InternalTestProtocolSession(_defaultVhost);
+
+ _config = new TopicDeletePolicyConfiguration();
+
+ XMLConfiguration config = new XMLConfiguration();
+
+ _config.setConfiguration("", config);
+ }
+
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ ApplicationRegistry.remove();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ private MockAMQQueue createOwnedQueue()
+ {
+ MockAMQQueue queue = new MockAMQQueue("testQueue");
+
+ _defaultVhost.getQueueRegistry().registerQueue(queue);
+
+ try
+ {
+ AMQChannel channel = new AMQChannel(_connection, 0, null);
+ _connection.addChannel(channel);
+
+ queue.setExclusiveOwningSession(channel);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to create Channel:" + e.getMessage());
+ }
+
+ return queue;
+ }
+
+ private void setQueueToAutoDelete(final AMQQueue queue)
+ {
+ ((MockAMQQueue) queue).setAutoDelete(true);
+
+ queue.setDeleteOnNoConsumers(true);
+ final AMQProtocolSession.Task deleteQueueTask =
+ new AMQProtocolSession.Task()
+ {
+ public void doTask(AMQProtocolSession session) throws AMQException
+ {
+ queue.delete();
+ }
+ };
+
+ ((AMQChannel) queue.getExclusiveOwningSession()).getProtocolSession().addSessionCloseTask(deleteQueueTask);
+ }
+
+
+
+ /** Check that a null queue passed in does not upset the policy. */
+ public void testNullQueueParameter()
+ {
+ TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+ try
+ {
+ policy.performPolicy(null);
+ }
+ catch (Exception e)
+ {
+ fail("Exception should not be thrown:" + e.getMessage());
+ }
+
+ }
+
+ /**
+ * Set a owning Session to null which means this is not an exclusive queue
+ * so the queue should not be deleted
+ */
+ public void testNonExclusiveQueue()
+ {
+ TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+ MockAMQQueue queue = createOwnedQueue();
+
+ queue.setExclusiveOwningSession(null);
+
+ policy.performPolicy(queue);
+
+ assertFalse("Queue should not be deleted", queue.isDeleted());
+ assertFalse("Connection should not be closed", _connection.isClosed());
+ }
+
+ /**
+ * Test that exclusive JMS Queues are not deleted.
+ * Bind the queue to the direct exchange (so it is a JMS Queue).
+ *
+ * JMS Queues are not to be processed so this should not delete the queue.
+ */
+ public void testQueuesAreNotProcessed()
+ {
+ TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+ MockAMQQueue queue = createOwnedQueue();
+
+ queue.addBinding(new Binding(null, "bindingKey", queue, new DirectExchange(), null));
+
+ policy.performPolicy(queue);
+
+ assertFalse("Queue should not be deleted", queue.isDeleted());
+ assertFalse("Connection should not be closed", _connection.isClosed());
+ }
+
+
+ /**
+ * Give a non auto-delete queue is bound to the topic exchange the
+ * TopicDeletePolicy will close the connection and delete the queue,
+ */
+ public void testNonAutoDeleteTopicIsNotClosed()
+ {
+ TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+ MockAMQQueue queue = createOwnedQueue();
+
+ queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
+
+ queue.setAutoDelete(false);
+
+ policy.performPolicy(queue);
+
+ assertFalse("Queue should not be deleted", queue.isDeleted());
+ assertTrue("Connection should be closed", _connection.isClosed());
+ }
+
+ /**
+ * Give a auto-delete queue bound to the topic exchange the TopicDeletePolicy will
+ * close the connection and delete the queue
+ */
+ public void testTopicIsClosed()
+ {
+ TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+ final MockAMQQueue queue = createOwnedQueue();
+
+ queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
+
+ setQueueToAutoDelete(queue);
+
+ policy.performPolicy(queue);
+
+ assertTrue("Queue should be deleted", queue.isDeleted());
+ assertTrue("Connection should be closed", _connection.isClosed());
+ }
+
+ /**
+ * Give a queue bound to the topic exchange the TopicDeletePolicy will
+ * close the connection and NOT delete the queue
+ */
+ public void testNonAutoDeleteTopicIsClosedNotDeleted() throws AMQException
+ {
+ TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+ MockAMQQueue queue = createOwnedQueue();
+
+ queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
+
+ policy.performPolicy(queue);
+
+ assertFalse("Queue should not be deleted", queue.isDeleted());
+ assertTrue("Connection should be closed", _connection.isClosed());
+ }
+
+ /**
+ * Give a queue bound to the topic exchange the TopicDeletePolicy suitably
+ * configured with the delete-persistent tag will close the connection
+ * and delete the queue
+ */
+ public void testPersistentTopicIsClosedAndDeleted()
+ {
+ _config.getConfig().addProperty("delete-persistent", "");
+
+ TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+ assertTrue("Config was not updated to delete Persistent topics",
+ _config.deletePersistent());
+
+ MockAMQQueue queue = createOwnedQueue();
+
+ queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
+
+ policy.performPolicy(queue);
+
+ assertTrue("Queue should be deleted", queue.isDeleted());
+ assertTrue("Connection should be closed", _connection.isClosed());
+ }
+
+}
Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java?rev=946665&r1=946664&r2=946665&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java Thu May 20 15:18:14 2010
@@ -73,7 +73,7 @@ public class SlowConsumerTest extends Qp
setConfigurationProperty("virtualhosts.virtualhost."
+ getConnectionURL().getVirtualHost().substring(1) +
".queues.slow-consumer-detection." +
- "policy[@name]", "TopicDelete");
+ "policy.name", "TopicDelete");
setConfigurationProperty("virtualhosts.virtualhost."
+ getConnectionURL().getVirtualHost().substring(1) +
@@ -85,7 +85,7 @@ public class SlowConsumerTest extends Qp
* Queue Configuration
<slow-consumer-detection>
- <!-- The depth before which the policy will be applied-->
+ <!-- The depth before which the policy will be applied-->
<depth>4235264</depth>
<!-- The message age before which the policy will be applied-->
@@ -96,7 +96,7 @@ public class SlowConsumerTest extends Qp
<!-- Policies configuration -->
<policy>
- <name>TopicDelete"</name>
+ <name>TopicDelete</name>
<topicDelete>
<delete-persistent/>
</topicDelete>
@@ -107,10 +107,10 @@ public class SlowConsumerTest extends Qp
/**
* Plugin Configuration
- *
+
<slow-consumer-detection>
- <delay>1</delay>
- <timeunit>MINUTES</timeunit>
+ <delay>1</delay>
+ <timeunit>MINUTES</timeunit>
</slow-consumer-detection>
*/
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=946665&r1=946664&r2=946665&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Thu May 20 15:18:14 2010
@@ -28,10 +28,13 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.transport.TestNetworkDriver;
@@ -196,4 +199,15 @@ public class InternalTestProtocolSession
// The alternative is to fully implement the TestIOSession to return a CloseFuture from close();
// Then the AMQMinaProtocolSession can join on the returning future without a NPE.
}
+
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ {
+ super.closeSession(session, cause, message);
+
+ //Simulate the Client responding with a CloseOK
+ // should really update the StateManger but we don't have access here
+ // changeState(AMQState.CONNECTION_CLOSED);
+ ((AMQChannel)session).getProtocolSession().closeSession();
+
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=946665&r1=946664&r2=946665&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Thu May 20 15:18:14 2010
@@ -35,10 +35,12 @@ import org.apache.qpid.server.binding.Bi
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.AMQException;
+import javax.swing.*;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
public class MockAMQQueue implements AMQQueue
{
@@ -49,6 +51,9 @@ public class MockAMQQueue implements AMQ
private PrincipalHolder _principalHolder;
private AMQSessionModel _exclusiveOwner;
+ private AMQShortString _owner;
+ private List<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
+ private boolean _autoDelete;
public MockAMQQueue(String name)
{
@@ -66,17 +71,17 @@ public class MockAMQQueue implements AMQ
public void addBinding(final Binding binding)
{
-
+ _bindings.add(binding);
}
public void removeBinding(final Binding binding)
{
-
+ _bindings.remove(binding);
}
public List<Binding> getBindings()
{
- return null;
+ return _bindings;
}
public int getBindingCount()
@@ -171,9 +176,15 @@ public class MockAMQQueue implements AMQ
public boolean isAutoDelete()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return _autoDelete;
}
+ public void setAutoDelete(boolean autodelete)
+ {
+ _autoDelete = autodelete;
+ }
+
+
public AMQShortString getOwner()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -194,17 +205,6 @@ public class MockAMQQueue implements AMQ
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
public void registerSubscription(Subscription subscription, boolean exclusive) throws AMQException
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -271,8 +271,9 @@ public class MockAMQQueue implements AMQ
}
public int delete() throws AMQException
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ {
+ _deleted = true;
+ return getMessageCount();
}
public void enqueue(ServerMessage message) throws AMQException
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org