You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/27 00:20:54 UTC
svn commit: r1354270 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/Queue.java
test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Author: tabish
Date: Tue Jun 26 22:20:51 2012
New Revision: 1354270
URL: http://svn.apache.org/viewvc?rev=1354270&view=rev
Log:
fix and test for: https://issues.apache.org/jira/browse/AMQ-3846
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1354270&r1=1354269&r2=1354270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Jun 26 22:20:51 2012
@@ -24,6 +24,7 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -1208,7 +1209,7 @@ public class Queue extends BaseDestinati
*/
public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception {
int movedCounter = 0;
- Set<MessageReference> set = new HashSet<MessageReference>();
+ Set<MessageReference> set = new LinkedHashSet<MessageReference>();
ConnectionContext context = createConnectionContext();
do {
doPageIn(true);
@@ -1273,7 +1274,7 @@ public class Queue extends BaseDestinati
int maximumMessages) throws Exception {
int movedCounter = 0;
int count = 0;
- Set<MessageReference> set = new HashSet<MessageReference>();
+ Set<MessageReference> set = new LinkedHashSet<MessageReference>();
do {
int oldMaxSize = getMaxPageSize();
setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
@@ -1364,7 +1365,7 @@ public class Queue extends BaseDestinati
public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
ActiveMQDestination dest, int maximumMessages) throws Exception {
int movedCounter = 0;
- Set<QueueMessageReference> set = new HashSet<QueueMessageReference>();
+ Set<QueueMessageReference> set = new LinkedHashSet<QueueMessageReference>();
do {
doPageIn(true);
pagedInMessagesLock.readLock().lock();
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1354270&r1=1354269&r2=1354270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java Tue Jun 26 22:20:51 2012
@@ -1078,6 +1078,120 @@ public class MBeanTest extends EmbeddedB
assertTrue("Should find the connection's ManagedTransportConnection", found);
}
+ public void testMoveMessagesToRetainOrder() throws Exception {
+ connection = connectionFactory.createConnection();
+ useConnection(connection);
+
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+
+ QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+ String newDestination = getSecondDestinationString();
+ queue.moveMatchingMessagesTo("", newDestination);
+
+ queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
+
+ queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+ int movedSize = MESSAGE_COUNT;
+ assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(newDestination);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ int last = -1;
+ int current = -1;
+ Message message = null;
+ while ((message = consumer.receive(2000)) != null) {
+ if (message.propertyExists("counter")) {
+ current = message.getIntProperty("counter");
+ assertEquals(last, current - 1);
+ last = current;
+ }
+ }
+
+ // now lets remove them by selector
+ queue.removeMatchingMessages("");
+
+ assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
+ assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+ }
+
+ public void testCopyMessagesToRetainOrder() throws Exception {
+ connection = connectionFactory.createConnection();
+ useConnection(connection);
+
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+
+ QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+ String newDestination = getSecondDestinationString();
+ queue.copyMatchingMessagesTo("", newDestination);
+
+ queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
+
+ queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+ int movedSize = MESSAGE_COUNT;
+ assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(newDestination);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ int last = -1;
+ int current = -1;
+ Message message = null;
+ while ((message = consumer.receive(2000)) != null) {
+ if (message.propertyExists("counter")) {
+ current = message.getIntProperty("counter");
+ assertEquals(last, current - 1);
+ last = current;
+ }
+ }
+
+ // now lets remove them by selector
+ queue.removeMatchingMessages("");
+
+ assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
+ assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+ }
+
+ public void testRemoveMatchingMessageRetainOrder() throws Exception {
+ connection = connectionFactory.createConnection();
+ useConnection(connection);
+
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+
+ QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+ String queueName = getDestinationString();
+ queue.removeMatchingMessages("counter < 10");
+
+ int newSize = MESSAGE_COUNT - 10;
+ assertEquals("Unexpected number of messages ", newSize, queue.getQueueSize());
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(queueName);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ int last = 9;
+ int current = 0;
+ Message message = null;
+ while ((message = consumer.receive(2000)) != null) {
+ if (message.propertyExists("counter")) {
+ current = message.getIntProperty("counter");
+ assertEquals(last, current - 1);
+ last = current;
+ }
+ }
+
+ // now lets remove them by selector
+ queue.removeMatchingMessages("");
+
+ assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
+ assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+ }
+
public void testBrowseBytesMessages() throws Exception {
connection = connectionFactory.createConnection();
useConnectionWithByteMessage(connection);