You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/12/16 18:07:24 UTC
svn commit: r1050059 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/RegionBroker.java
test/java/org/apache/activemq/broker/jmx/PurgeTest.java
Author: dejanb
Date: Thu Dec 16 17:07:24 2010
New Revision: 1050059
URL: http://svn.apache.org/viewvc?rev=1050059&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3092 - Deleting a Queue from the console results in lost messages
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1050059&r1=1050058&r2=1050059&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Dec 16 17:07:24 2010
@@ -492,7 +492,8 @@ public class RegionBroker extends EmptyB
@Override
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
message.setBrokerInTime(System.currentTimeMillis());
- if (producerExchange.isMutable() || producerExchange.getRegion() == null) {
+ if (producerExchange.isMutable() || producerExchange.getRegion() == null
+ || (producerExchange.getRegion() != null && producerExchange.getRegion().getDestinationMap().get(message.getDestination()) == null)) {
ActiveMQDestination destination = message.getDestination();
// ensure the destination is registered with the RegionBroker
producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination,false);
@@ -514,6 +515,7 @@ public class RegionBroker extends EmptyB
throw createUnknownDestinationTypeException(destination);
}
producerExchange.setRegion(region);
+ producerExchange.setRegionDestination(null);
}
producerExchange.getRegion().send(producerExchange, message);
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java?rev=1050059&r1=1050058&r2=1050059&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java Thu Dec 16 17:07:24 2010
@@ -16,10 +16,7 @@
*/
package org.apache.activemq.broker.jmx;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
+import javax.jms.*;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
@@ -111,6 +108,38 @@ public class PurgeTest extends EmbeddedB
addCombinationValues("persistenceAdapter", new Object[] {new MemoryPersistenceAdapter(), new AMQPersistenceAdapter(), new JDBCPersistenceAdapter()});
}
+ public void testDeleteSameProducer() throws Exception {
+ connection = connectionFactory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination();
+
+ MessageProducer producer = session.createProducer(destination);
+ Message message = session.createTextMessage("Test Message");
+ producer.send(message);
+
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ Message received = consumer.receive(1000);
+ assertEquals(message, received);
+
+ ObjectName brokerViewMBeanName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
+ BrokerViewMBean brokerProxy = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerViewMBeanName, BrokerViewMBean.class, true);
+
+ brokerProxy.removeQueue(getDestinationString());
+
+
+ producer.send(message);
+
+ received = consumer.receive(1000);
+
+ assertNotNull("Message not received", received);
+ assertEquals(message, received);
+
+
+ }
+
public void testDelete() throws Exception {
// Send some messages
connection = connectionFactory.createConnection();