You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/27 00:20:54 UTC

svn commit: r1354270 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java test/java/org/apache/activemq/broker/jmx/MBeanTest.java

Author: tabish
Date: Tue Jun 26 22:20:51 2012
New Revision: 1354270

URL: http://svn.apache.org/viewvc?rev=1354270&view=rev
Log:
fix and test for: https://issues.apache.org/jira/browse/AMQ-3846

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1354270&r1=1354269&r2=1354270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Jun 26 22:20:51 2012
@@ -24,6 +24,7 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -1208,7 +1209,7 @@ public class Queue extends BaseDestinati
      */
     public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception {
         int movedCounter = 0;
-        Set<MessageReference> set = new HashSet<MessageReference>();
+        Set<MessageReference> set = new LinkedHashSet<MessageReference>();
         ConnectionContext context = createConnectionContext();
         do {
             doPageIn(true);
@@ -1273,7 +1274,7 @@ public class Queue extends BaseDestinati
             int maximumMessages) throws Exception {
         int movedCounter = 0;
         int count = 0;
-        Set<MessageReference> set = new HashSet<MessageReference>();
+        Set<MessageReference> set = new LinkedHashSet<MessageReference>();
         do {
             int oldMaxSize = getMaxPageSize();
             setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
@@ -1364,7 +1365,7 @@ public class Queue extends BaseDestinati
     public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
             ActiveMQDestination dest, int maximumMessages) throws Exception {
         int movedCounter = 0;
-        Set<QueueMessageReference> set = new HashSet<QueueMessageReference>();
+        Set<QueueMessageReference> set = new LinkedHashSet<QueueMessageReference>();
         do {
             doPageIn(true);
             pagedInMessagesLock.readLock().lock();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1354270&r1=1354269&r2=1354270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java Tue Jun 26 22:20:51 2012
@@ -1078,6 +1078,120 @@ public class MBeanTest extends EmbeddedB
         assertTrue("Should find the connection's ManagedTransportConnection", found);
     }
 
+    public void testMoveMessagesToRetainOrder() throws Exception {
+        connection = connectionFactory.createConnection();
+        useConnection(connection);
+
+        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+
+        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+        String newDestination = getSecondDestinationString();
+        queue.moveMatchingMessagesTo("", newDestination);
+
+        queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
+
+        queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+        int movedSize = MESSAGE_COUNT;
+        assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(newDestination);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        int last = -1;
+        int current = -1;
+        Message message = null;
+        while ((message = consumer.receive(2000)) != null) {
+            if (message.propertyExists("counter")) {
+                current = message.getIntProperty("counter");
+                assertEquals(last, current - 1);
+                last = current;
+            }
+        }
+
+        // now lets remove them by selector
+        queue.removeMatchingMessages("");
+
+        assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
+        assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+    }
+
+    public void testCopyMessagesToRetainOrder() throws Exception {
+        connection = connectionFactory.createConnection();
+        useConnection(connection);
+
+        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+
+        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+        String newDestination = getSecondDestinationString();
+        queue.copyMatchingMessagesTo("", newDestination);
+
+        queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
+
+        queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+        int movedSize = MESSAGE_COUNT;
+        assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(newDestination);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        int last = -1;
+        int current = -1;
+        Message message = null;
+        while ((message = consumer.receive(2000)) != null) {
+            if (message.propertyExists("counter")) {
+                current = message.getIntProperty("counter");
+                assertEquals(last, current - 1);
+                last = current;
+            }
+        }
+
+        // now lets remove them by selector
+        queue.removeMatchingMessages("");
+
+        assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
+        assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+    }
+
+    public void testRemoveMatchingMessageRetainOrder() throws Exception {
+        connection = connectionFactory.createConnection();
+        useConnection(connection);
+
+        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+
+        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+        String queueName = getDestinationString();
+        queue.removeMatchingMessages("counter < 10");
+
+        int newSize = MESSAGE_COUNT - 10;
+        assertEquals("Unexpected number of messages ", newSize, queue.getQueueSize());
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(queueName);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        int last = 9;
+        int current = 0;
+        Message message = null;
+        while ((message = consumer.receive(2000)) != null) {
+            if (message.propertyExists("counter")) {
+                current = message.getIntProperty("counter");
+                assertEquals(last, current - 1);
+                last = current;
+            }
+        }
+
+        // now lets remove them by selector
+        queue.removeMatchingMessages("");
+
+        assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
+        assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+    }
+
     public void testBrowseBytesMessages() throws Exception {
         connection = connectionFactory.createConnection();
         useConnectionWithByteMessage(connection);