You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/02/28 15:23:31 UTC
svn commit: r1075346 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/Queue.java
test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Author: dejanb
Date: Mon Feb 28 14:23:30 2011
New Revision: 1075346
URL: http://svn.apache.org/viewvc?rev=1075346&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3193 - consumers don't get messages after JMX remove
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1075346&r1=1075345&r2=1075346&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Feb 28 14:23:30 2011
@@ -1437,6 +1437,23 @@ public class Queue extends BaseDestinati
} catch (Throwable e) {
LOG.error("Failed to page in more queue messages ", e);
}
+ } else {
+ // if there are already paged messages
+ // dispatch them
+ if (pagedInMessages.size() != 0) {
+ pagedInMessagesLock.writeLock().lock();
+ ArrayList paged = new ArrayList();
+ try {
+ paged.addAll(pagedInMessages.values());
+ } finally {
+ pagedInMessagesLock.writeLock().unlock();
+ }
+ try {
+ doDispatch(paged);
+ } catch (Exception e) {
+ LOG.error("Failed to dispatch already paged messages ", e);
+ }
+ }
}
if (pendingBrowserDispatch != null) {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1075346&r1=1075345&r2=1075346&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java Mon Feb 28 14:23:30 2011
@@ -48,6 +48,7 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.util.JMXSupport;
@@ -161,6 +162,39 @@ public class MBeanTest extends EmbeddedB
assertTrue("cache enabled", queueNew.isCacheEnabled());
}
+ public void testRemoveMessages() throws Exception {
+ ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
+ BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
+ broker.addQueue(getDestinationString());
+
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+
+ QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+ String msg1 = queue.sendTextMessage("message 1");
+ String msg2 = queue.sendTextMessage("message 2");
+
+ assertTrue(queue.removeMessage(msg2));
+
+ connection = connectionFactory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQDestination dest = createDestination();
+
+ MessageConsumer consumer = session.createConsumer(dest);
+ Message message = consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals(msg1, message.getJMSMessageID());
+
+ String msg3 = queue.sendTextMessage("message 3");
+ message = consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals(msg3, message.getJMSMessageID());
+
+ message = consumer.receive(1000);
+ assertNull(message);
+
+ }
+
public void testRetryMessages() throws Exception {
// lets speed up redelivery
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory;