You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/04/15 13:37:45 UTC

svn commit: r765141 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/BaseDestination.java test/java/org/apache/activemq/CombinationTestSupport.java test/java/org/apache/activemq/broker/jmx/PurgeTest.java

Author: dejanb
Date: Wed Apr 15 11:37:45 2009
New Revision: 765141

URL: http://svn.apache.org/viewvc?rev=765141&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2209 - deleting queue does not remove messages

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/CombinationTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=765141&r1=765140&r2=765141&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Wed Apr 15 11:37:45 2009
@@ -431,6 +431,7 @@
 
     public void dispose(ConnectionContext context) throws IOException {
         if (this.store != null) {
+        	this.store.removeAllMessages(context);
             this.store.dispose(context);
         }
         this.destinationStatistics.setParent(null);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/CombinationTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/CombinationTestSupport.java?rev=765141&r1=765140&r2=765141&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/CombinationTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/CombinationTestSupport.java Wed Apr 15 11:37:45 2009
@@ -219,7 +219,11 @@
     }
 
     public String getName() {
-        if (options != null) {
+    	return getName(false);
+    }
+    
+    public String getName(boolean original) {
+        if (options != null && !original) {
             return super.getName() + " " + options;
         }
         return super.getName();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java?rev=765141&r1=765140&r2=765141&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java Wed Apr 15 11:37:45 2009
@@ -25,9 +25,15 @@
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
+import junit.framework.Test;
 import junit.textui.TestRunner;
+
 import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -47,10 +53,15 @@
     protected boolean transacted;
     protected int authMode = Session.AUTO_ACKNOWLEDGE;
     protected int messageCount = 10;
+    public PersistenceAdapter persistenceAdapter; 
 
     public static void main(String[] args) {
         TestRunner.run(PurgeTest.class);
     }
+    
+    public static Test suite() {
+    	return suite(PurgeTest.class);
+    }
 
     public void testPurge() throws Exception {
         // Send some messages
@@ -91,6 +102,68 @@
         count = proxy.getQueueSize();
         assertEquals("Queue size", count, 0);
     }
+    
+    public void initCombosForTestDelete() {
+    	addCombinationValues("persistenceAdapter", new Object[] {new MemoryPersistenceAdapter(), new AMQPersistenceAdapter(), new JDBCPersistenceAdapter()});
+    }
+    
+    public void testDelete() throws Exception {
+        // Send some messages
+        connection = connectionFactory.createConnection();
+        connection.setClientID(clientID);
+        connection.start();
+        Session session = connection.createSession(transacted, authMode);
+        destination = createDestination();
+        sendMessages(session, messageCount);
+
+        // Now get the QueueViewMBean and purge
+        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+        QueueViewMBean queueProxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+        
+        ObjectName brokerViewMBeanName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
+        BrokerViewMBean brokerProxy = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerViewMBeanName, BrokerViewMBean.class, true);
+
+        long count = queueProxy.getQueueSize();
+        assertEquals("Queue size", count, messageCount);
+
+        brokerProxy.removeQueue(getDestinationString());
+        
+        sendMessages(session, messageCount);
+        
+        queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+        queueProxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+        
+        count = queueProxy.getQueueSize();
+        assertEquals("Queue size", count, messageCount);
+        
+        queueProxy.purge();
+
+        // Queues have a special case once there are more than a thousand
+        // dead messages, make sure we hit that.
+        messageCount += 1000;
+        sendMessages(session, messageCount);
+
+        count = queueProxy.getQueueSize();
+        assertEquals("Queue size", count, messageCount);
+
+        brokerProxy.removeQueue(getDestinationString());
+
+        sendMessages(session, messageCount);
+        
+        queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+        queueProxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+        
+        count = queueProxy.getQueueSize();
+        assertEquals("Queue size", count, messageCount);
+    }
+    
+    private void sendMessages(Session session, int count) throws Exception {
+    	MessageProducer producer = session.createProducer(destination);
+        for (int i = 0; i < messageCount; i++) {
+            Message message = session.createTextMessage("Message: " + i);
+            producer.send(message);
+        }
+    }
 
     protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException {
         ObjectName objectName = new ObjectName(name);
@@ -121,12 +194,20 @@
         BrokerService answer = new BrokerService();
         answer.setUseJmx(true);
         answer.setEnableStatistics(true);
-        answer.setPersistent(false);
         answer.addConnector(bindAddress);
+        answer.setPersistenceAdapter(persistenceAdapter);
+        answer.deleteAllMessages();
         return answer;
     }
 
     protected void echo(String text) {
         LOG.info(text);
     }
+    
+    /**
+     * Returns the name of the destination used in this test case
+     */
+    protected String getDestinationString() {
+        return getClass().getName() + "." + getName(true);
+    }
 }