You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/06/28 20:15:30 UTC

svn commit: r1140770 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/usecases/

Author: tabish
Date: Tue Jun 28 18:15:29 2011
New Revision: 1140770

URL: http://svn.apache.org/viewvc?rev=1140770&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-2524

When destinations are removed via JMX or by auto remove of inactive destinations an existing producer registration can be left with a reference to a resource that no longer exists but appears valid.  The destination should be marked as disposed so that a producer send can check to see if it should discard its current reference to the destination and either recreate it or update its cache to point to the currect destination resource.

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JMXRemoveQueueThenSendIgnoredTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1140770&r1=1140769&r2=1140770&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Tue Jun 28 18:15:29 2011
@@ -43,7 +43,7 @@ import org.apache.activemq.usage.Usage;
 import org.slf4j.Logger;
 
 /**
- * 
+ *
  */
 public abstract class BaseDestination implements Destination {
     /**
@@ -97,6 +97,7 @@ public abstract class BaseDestination im
     private long lastActiveTime=0l;
     private boolean reduceMemoryFootprint = false;
     protected final Scheduler scheduler;
+    private boolean disposed = false;
 
     /**
      * @param brokerService
@@ -122,7 +123,7 @@ public abstract class BaseDestination im
 
     /**
      * initialize the destination
-     * 
+     *
      * @throws Exception
      */
     public void initialize() throws Exception {
@@ -151,7 +152,7 @@ public abstract class BaseDestination im
      * Set's the interval at which warnings about producers being blocked by
      * resource usage will be triggered. Values of 0 or less will disable
      * warnings
-     * 
+     *
      * @param blockedProducerWarningInterval the interval at which warning about
      *            blocked producers will be triggered.
      */
@@ -160,7 +161,7 @@ public abstract class BaseDestination im
     }
 
     /**
-     * 
+     *
      * @return the interval at which warning about blocked producers will be
      *         triggered.
      */
@@ -218,7 +219,7 @@ public abstract class BaseDestination im
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         destinationStatistics.getProducers().decrement();
     }
-    
+
     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
         destinationStatistics.getConsumers().increment();
         this.lastActiveTime=0l;
@@ -420,7 +421,7 @@ public abstract class BaseDestination im
 
     /**
      * set the dead letter strategy
-     * 
+     *
      * @param deadLetterStrategy
      */
     public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
@@ -437,7 +438,7 @@ public abstract class BaseDestination im
 
     /**
      * called when message is consumed
-     * 
+     *
      * @param context
      * @param messageReference
      */
@@ -449,7 +450,7 @@ public abstract class BaseDestination im
 
     /**
      * Called when message is delivered to the broker
-     * 
+     *
      * @param context
      * @param messageReference
      */
@@ -462,7 +463,7 @@ public abstract class BaseDestination im
     /**
      * Called when a message is discarded - e.g. running low on memory This will
      * happen only if the policy is enabled - e.g. non durable topics
-     * 
+     *
      * @param context
      * @param messageReference
      */
@@ -474,7 +475,7 @@ public abstract class BaseDestination im
 
     /**
      * Called when there is a slow consumer
-     * 
+     *
      * @param context
      * @param subs
      */
@@ -489,7 +490,7 @@ public abstract class BaseDestination im
 
     /**
      * Called to notify a producer is too fast
-     * 
+     *
      * @param context
      * @param producerInfo
      */
@@ -501,7 +502,7 @@ public abstract class BaseDestination im
 
     /**
      * Called when a Usage reaches a limit
-     * 
+     *
      * @param context
      * @param usage
      */
@@ -518,6 +519,11 @@ public abstract class BaseDestination im
         }
         this.destinationStatistics.setParent(null);
         this.memoryUsage.stop();
+        this.disposed = true;
+    }
+
+    public boolean isDisposed() {
+        return this.disposed;
     }
 
     /**
@@ -585,7 +591,7 @@ public abstract class BaseDestination im
     protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
         waitForSpace(context, usage, 100, warning);
     }
-    
+
     protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
         if (systemUsage.isSendFailIfNoSpace()) {
             getLog().debug("sendFailIfNoSpace, forcing exception on send, usage:  " + usage + ": " + warning);
@@ -603,7 +609,7 @@ public abstract class BaseDestination im
                 if (context.getStopping().get()) {
                     throw new IOException("Connection closed, send aborted.");
                 }
-    
+
                 long now = System.currentTimeMillis();
                 if (now >= nextWarn) {
                     getLog().info("" + usage + ": " + warning + " (blocking for: " + (now - start) / 1000 + "s)");
@@ -623,7 +629,7 @@ public abstract class BaseDestination im
         return this.slowConsumerStrategy;
     }
 
-   
+
     public boolean isPrioritizedMessages() {
         return this.prioritizedMessages;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=1140770&r1=1140769&r2=1140770&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Tue Jun 28 18:15:29 2011
@@ -35,7 +35,7 @@ import org.apache.activemq.usage.MemoryU
 import org.apache.activemq.usage.Usage;
 
 /**
- * 
+ *
  */
 public interface Destination extends Service, Task {
 
@@ -62,6 +62,8 @@ public interface Destination extends Ser
 
     void dispose(ConnectionContext context) throws IOException;
 
+    boolean isDisposed();
+
     DestinationStatistics getDestinationStatistics();
 
     DeadLetterStrategy getDeadLetterStrategy();
@@ -80,14 +82,14 @@ public interface Destination extends Ser
      * Set's the interval at which warnings about producers being blocked by
      * resource usage will be triggered. Values of 0 or less will disable
      * warnings
-     * 
+     *
      * @param blockedProducerWarningInterval the interval at which warning about
      *            blocked producers will be triggered.
      */
     public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval);
 
     /**
-     * 
+     *
      * @return the interval at which warning about blocked producers will be
      *         triggered.
      */
@@ -140,14 +142,14 @@ public interface Destination extends Ser
 
     /**
      * set the lazy dispatch - default is false
-     * 
+     *
      * @param value
      */
     public void setLazyDispatch(boolean value);
 
     /**
      * Inform the Destination a message has expired
-     * 
+     *
      * @param context
      * @param subs
      * @param node
@@ -156,7 +158,7 @@ public interface Destination extends Ser
 
     /**
      * called when message is consumed
-     * 
+     *
      * @param context
      * @param messageReference
      */
@@ -164,7 +166,7 @@ public interface Destination extends Ser
 
     /**
      * Called when message is delivered to the broker
-     * 
+     *
      * @param context
      * @param messageReference
      */
@@ -173,16 +175,16 @@ public interface Destination extends Ser
     /**
      * Called when a message is discarded - e.g. running low on memory This will
      * happen only if the policy is enabled - e.g. non durable topics
-     * 
+     *
      * @param context
      * @param messageReference
-     * @param sub 
+     * @param sub
      */
     void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference);
 
     /**
      * Called when there is a slow consumer
-     * 
+     *
      * @param context
      * @param subs
      */
@@ -190,7 +192,7 @@ public interface Destination extends Ser
 
     /**
      * Called to notify a producer is too fast
-     * 
+     *
      * @param context
      * @param producerInfo
      */
@@ -198,7 +200,7 @@ public interface Destination extends Ser
 
     /**
      * Called when a Usage reaches a limit
-     * 
+     *
      * @param context
      * @param usage
      */
@@ -209,12 +211,12 @@ public interface Destination extends Ser
     /**
      * called on Queues in slave mode to allow dispatch to follow subscription
      * choice of master
-     * 
+     *
      * @param messageDispatchNotification
      * @throws Exception
      */
     void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception;
-    
+
     boolean isPrioritizedMessages();
 
     SlowConsumerStrategy getSlowConsumerStrategy();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=1140770&r1=1140769&r2=1140770&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Tue Jun 28 18:15:29 2011
@@ -34,8 +34,8 @@ import org.apache.activemq.usage.MemoryU
 import org.apache.activemq.usage.Usage;
 
 /**
- * 
- * 
+ *
+ *
  */
 public class DestinationFilter implements Destination {
 
@@ -61,6 +61,10 @@ public class DestinationFilter implement
         next.dispose(context);
     }
 
+    public boolean isDisposed() {
+        return next.isDisposed();
+    }
+
     public void gc() {
         next.gc();
     }
@@ -107,7 +111,7 @@ public class DestinationFilter implement
 
     /**
      * Sends a message to the given destination which may be a wildcard
-     * 
+     *
      * @param context broker context
      * @param message message to send
      * @param destination possibly wildcard destination to send the message to
@@ -137,7 +141,7 @@ public class DestinationFilter implement
     public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
         next.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
     }
-    
+
     public long getBlockedProducerWarningInterval() {
         return next.getBlockedProducerWarningInterval();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1140770&r1=1140769&r2=1140770&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Tue Jun 28 18:15:29 2011
@@ -72,8 +72,8 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Routes Broker operations to the correct messaging regions for processing.
- * 
- * 
+ *
+ *
  */
 public class RegionBroker extends EmptyBroker {
     public static final String ORIGINAL_EXPIRATION = "originalExpiration";
@@ -232,17 +232,17 @@ public class RegionBroker extends EmptyB
         synchronized (clientIdSet) {
             ConnectionContext oldContext = clientIdSet.get(clientId);
             if (oldContext != null) {
-            	if (context.isFaultTolerant() || context.isNetworkConnection()){
-            		//remove the old connection
-            		try{
-            			removeConnection(oldContext, info, new Exception("remove stale client"));
-            		}catch(Exception e){
-            			LOG.warn("Failed to remove stale connection ",e);
-            		}
-            	}else{
+                if (context.isFaultTolerant() || context.isNetworkConnection()){
+                    //remove the old connection
+                    try{
+                        removeConnection(oldContext, info, new Exception("remove stale client"));
+                    }catch(Exception e){
+                        LOG.warn("Failed to remove stale connection ",e);
+                    }
+                }else{
                 throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
                                                    + oldContext.getConnection().getRemoteAddress());
-            	}
+                }
             } else {
                 clientIdSet.put(clientId, context);
             }
@@ -496,7 +496,8 @@ public class RegionBroker extends EmptyB
     public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
         message.setBrokerInTime(System.currentTimeMillis());
         if (producerExchange.isMutable() || producerExchange.getRegion() == null
-                || (producerExchange.getRegion() != null && producerExchange.getRegion().getDestinationMap().get(message.getDestination()) == null)) {
+                || (producerExchange.getRegion() != null && producerExchange.getRegion().getDestinationMap().get(message.getDestination()) == null)
+                || (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) {
             ActiveMQDestination destination = message.getDestination();
             // ensure the destination is registered with the RegionBroker
             producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend());
@@ -520,6 +521,7 @@ public class RegionBroker extends EmptyB
             producerExchange.setRegion(region);
             producerExchange.setRegionDestination(null);
         }
+
         producerExchange.getRegion().send(producerExchange, message);
     }
 
@@ -793,18 +795,18 @@ public class RegionBroker extends EmptyB
         }
         return expired;
     }
-   
+
     private boolean stampAsExpired(Message message) throws IOException {
         boolean stamped=false;
         if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
-            long expiration=message.getExpiration();     
+            long expiration=message.getExpiration();
             message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
             stamped = true;
         }
         return stamped;
     }
 
-    
+
     @Override
     public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
         if (LOG.isDebugEnabled()) {
@@ -812,51 +814,51 @@ public class RegionBroker extends EmptyB
         }
         getRoot().sendToDeadLetterQueue(context, node, subscription);
     }
-    
+
     @Override
     public void sendToDeadLetterQueue(ConnectionContext context,
-	        MessageReference node, Subscription subscription){
-		try{
-			if(node!=null){
-				Message message=node.getMessage();
-				if(message!=null && node.getRegionDestination()!=null){
-					DeadLetterStrategy deadLetterStrategy=node
-					        .getRegionDestination().getDeadLetterStrategy();
-					if(deadLetterStrategy!=null){
-						if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
-						    // message may be inflight to other subscriptions so do not modify
-						    message = message.copy();
-						    stampAsExpired(message);
-						    message.setExpiration(0);
-						    if(!message.isPersistent()){
-							    message.setPersistent(true);
-							    message.setProperty("originalDeliveryMode",
-								        "NON_PERSISTENT");
-							}
-							// The original destination and transaction id do
-							// not get filled when the message is first sent,
-							// it is only populated if the message is routed to
-							// another destination like the DLQ
-							ActiveMQDestination deadLetterDestination=deadLetterStrategy
-							        .getDeadLetterQueueFor(message, subscription);
-							if (context.getBroker()==null) {
-								context.setBroker(getRoot());
-							}
-							BrokerSupport.resendNoCopy(context,message,
-							        deadLetterDestination);
-						}
-					} else {
-					    if (LOG.isDebugEnabled()) {
-					        LOG.debug("Dead Letter message with no DLQ strategy in place, message id: "
+            MessageReference node, Subscription subscription){
+        try{
+            if(node!=null){
+                Message message=node.getMessage();
+                if(message!=null && node.getRegionDestination()!=null){
+                    DeadLetterStrategy deadLetterStrategy=node
+                            .getRegionDestination().getDeadLetterStrategy();
+                    if(deadLetterStrategy!=null){
+                        if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
+                            // message may be inflight to other subscriptions so do not modify
+                            message = message.copy();
+                            stampAsExpired(message);
+                            message.setExpiration(0);
+                            if(!message.isPersistent()){
+                                message.setPersistent(true);
+                                message.setProperty("originalDeliveryMode",
+                                        "NON_PERSISTENT");
+                            }
+                            // The original destination and transaction id do
+                            // not get filled when the message is first sent,
+                            // it is only populated if the message is routed to
+                            // another destination like the DLQ
+                            ActiveMQDestination deadLetterDestination=deadLetterStrategy
+                                    .getDeadLetterQueueFor(message, subscription);
+                            if (context.getBroker()==null) {
+                                context.setBroker(getRoot());
+                            }
+                            BrokerSupport.resendNoCopy(context,message,
+                                    deadLetterDestination);
+                        }
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Dead Letter message with no DLQ strategy in place, message id: "
                                     + message.getMessageId() + ", destination: " + message.getDestination());
-					    }
-					}
-				}
-			}
-		}catch(Exception e){
-			LOG.warn("Caught an exception sending to DLQ: "+node,e);
-		}
-	}
+                        }
+                    }
+                }
+            }
+        }catch(Exception e){
+            LOG.warn("Caught an exception sending to DLQ: "+node,e);
+        }
+    }
 
     @Override
     public Broker getRoot() {
@@ -867,7 +869,7 @@ public class RegionBroker extends EmptyB
             throw new RuntimeException("The broker from the BrokerService should not throw an exception");
         }
     }
-    
+
     /**
      * @return the broker sequence id
      */
@@ -877,17 +879,17 @@ public class RegionBroker extends EmptyB
             return sequenceGenerator.getNextSequenceId();
         }
     }
-    
-    
+
+
     @Override
     public Scheduler getScheduler() {
         return this.scheduler;
     }
-    
+
     public ThreadPoolExecutor getExecutor() {
         return this.executor;
     }
-    
+
     @Override
     public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
         ActiveMQDestination destination = control.getDestination();
@@ -899,20 +901,20 @@ public class RegionBroker extends EmptyB
         case ActiveMQDestination.TOPIC_TYPE:
             topicRegion.processConsumerControl(consumerExchange, control);
             break;
-            
+
         case ActiveMQDestination.TEMP_QUEUE_TYPE:
             tempQueueRegion.processConsumerControl(consumerExchange, control);
             break;
-            
+
         case ActiveMQDestination.TEMP_TOPIC_TYPE:
             tempTopicRegion.processConsumerControl(consumerExchange, control);
             break;
-            
+
         default:
             LOG.warn("unmatched destination: " + destination + ", in consumerControl: "  + control);
         }
     }
-    
+
     protected void addBrokerInClusterUpdate() {
         List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
         for (TransportConnector connector : connectors) {
@@ -930,7 +932,7 @@ public class RegionBroker extends EmptyB
             }
         }
     }
-    
+
     protected void purgeInactiveDestinations() {
         synchronized (purgeInactiveDestinationsTask) {
             List<BaseDestination> list = new ArrayList<BaseDestination>();

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JMXRemoveQueueThenSendIgnoredTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JMXRemoveQueueThenSendIgnoredTest.java?rev=1140770&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JMXRemoveQueueThenSendIgnoredTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JMXRemoveQueueThenSendIgnoredTest.java Tue Jun 28 18:15:29 2011
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMXRemoveQueueThenSendIgnoredTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JMXRemoveQueueThenSendIgnoredTest.class);
+
+    private BrokerService brokerService;
+    private MessageProducer producer;
+    private QueueSession session;
+    private QueueConnection connection;
+    private Queue queue;
+    private int count = 1;
+
+    @Before
+    public void setUp() throws Exception  {
+        brokerService = new BrokerService();
+        brokerService.setBrokerName("dev");
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(true);
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.start();
+
+        final String brokerUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
+
+        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerUri);
+        connection = activeMQConnectionFactory.createQueueConnection();
+        session = connection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE/*SESSION_TRANSACTED*/);
+        queue = session.createQueue("myqueue");
+        producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        connection.start();
+    }
+
+    @Test
+    public void testRemoveQueueAndProduceAfterNewConsumerAdded() throws Exception {
+        MessageConsumer firstConsumer = registerConsumer();
+        produceMessage();
+        Message message = firstConsumer.receive(5000);
+        LOG.debug("Received message " + message);
+
+        assertEquals(1, numberOfMessages());
+        firstConsumer.close();
+        session.commit();
+        Thread.sleep(1000);
+
+        removeQueue();
+        Thread.sleep(1000);
+
+        MessageConsumer secondConsumer = registerConsumer();
+        produceMessage();
+        message = secondConsumer.receive(5000);
+        LOG.debug("Received message " + message);
+
+        assertEquals(1, numberOfMessages());
+        secondConsumer.close();
+    }
+
+    @Test
+    public void testRemoveQueueAndProduceBeforeNewConsumerAdded() throws Exception {
+        MessageConsumer firstConsumer = registerConsumer();
+        produceMessage();
+        Message message = firstConsumer.receive(5000);
+        LOG.debug("Received message " + message);
+
+        assertEquals(1, numberOfMessages());
+        firstConsumer.close();
+        session.commit();
+        Thread.sleep(1000);
+
+        removeQueue();
+        Thread.sleep(1000);
+
+        produceMessage();
+        MessageConsumer secondConsumer = registerConsumer();
+        message = secondConsumer.receive(5000);
+        LOG.debug("Received message " + message);
+
+        assertEquals(1, numberOfMessages());
+        secondConsumer.close();
+    }
+
+    private MessageConsumer registerConsumer() throws JMSException {
+        MessageConsumer consumer = session.createConsumer(queue);
+        return consumer;
+    }
+
+    private int numberOfMessages() throws Exception {
+        JMXConnector jmxConnector = JMXConnectorFactory.connect(new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"));
+        MBeanServerConnection mbeanServerConnection = jmxConnector.getMBeanServerConnection();
+        String beanId = "org.apache.activemq:BrokerName=dev,Type=Queue,Destination=myqueue";
+        List<?> object = (List<?>) mbeanServerConnection.invoke(new ObjectName(beanId), "browseMessages", null, null);
+        jmxConnector.close();
+        return object.size();
+    }
+
+    private void removeQueue() throws Exception {
+        LOG.debug("Removing Destination: myqueue");
+        brokerService.getAdminView().removeQueue("myqueue");
+    }
+
+    private void produceMessage() throws JMSException {
+        TextMessage textMessage = session.createTextMessage();
+        textMessage.setText("Sending message: " + count++);
+        LOG.debug("Sending message: " + textMessage);
+        producer.send(textMessage);
+        session.commit();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        connection.close();
+        brokerService.stop();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JMXRemoveQueueThenSendIgnoredTest.java
------------------------------------------------------------------------------
    svn:eol-style = native