You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/04/09 12:36:00 UTC

svn commit: r932342 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/network/ test/java/org/apache/activemq/bugs/ test/java/org/apache/activemq/...

Author: gtully
Date: Fri Apr  9 10:36:00 2010
New Revision: 932342

URL: http://svn.apache.org/viewvc?rev=932342&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2691 - ensure suppressed durable subs are acked so that message can be removed and not recovered at a later date. Tidy up network bridge creation so that durables are supported by default and allow dynamicOnly to determine if durables are auto bridged. Avoid duplicate mbeans for durable subs on restart and allow active to reflect status of a sub

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=932342&r1=932341&r2=932342&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Fri Apr  9 10:36:00 2010
@@ -29,6 +29,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.broker.region.TopicSubscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -338,14 +339,16 @@ public class ManagedRegionBroker extends
         if (destinations != null) {
             for (Iterator iter = destinations.iterator(); iter.hasNext();) {
                 ActiveMQDestination dest = (ActiveMQDestination)iter.next();
-                if (dest.isTopic()) {
+                if (dest.isTopic()) {                
                     SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
                     if (infos != null) {
                         for (int i = 0; i < infos.length; i++) {
                             SubscriptionInfo info = infos[i];
-                            LOG.debug("Restoring durable subscription: " + info);
                             SubscriptionKey key = new SubscriptionKey(info);
-                            subscriptions.put(key, info);
+                            if (!alreadyKnown(key)) {
+                                LOG.debug("Restoring durable subscription mbean: " + info);
+                                subscriptions.put(key, info);
+                            }
                         }
                     }
                 }
@@ -359,6 +362,15 @@ public class ManagedRegionBroker extends
         }
     }
 
+    private boolean alreadyKnown(SubscriptionKey key) {
+        boolean known = false;
+        known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Sub with key: " + key + ", " + (known ? "": "not") +  " already registered");
+        }
+        return known;
+    }
+
     protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info) {
         Hashtable map = brokerObjectName.getKeyPropertyList();
         try {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java?rev=932342&r1=932341&r2=932342&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java Fri Apr  9 10:36:00 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.TopicSubscription;
 
 /**
@@ -58,5 +59,15 @@ public class TopicSubscriptionView exten
         }
     }
 
+    @Override
+    public boolean isActive() {
+        if (subscription instanceof DurableTopicSubscription) {
+            return ((DurableTopicSubscription)subscription).isActive();
+        } else {
+            return super.isActive();
+        }
+    }
+
+    
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=932342&r1=932341&r2=932342&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Apr  9 10:36:00 2010
@@ -283,6 +283,7 @@ public abstract class PrefetchSubscripti
                     MessageId messageId = node.getMessageId();
                     if (ack.getLastMessageId().equals(messageId)) {
                         // this should never be within a transaction
+                        dequeueCounter++;
                         node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
                         destination = node.getRegionDestination();
                         acknowledge(context, ack, node);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=932342&r1=932341&r2=932342&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Fri Apr  9 10:36:00 2010
@@ -170,7 +170,7 @@ public class TopicRegion extends Abstrac
             for (int i = 0; i < infos.length; i++) {
 
                 SubscriptionInfo info = infos[i];
-                LOG.debug("Restoring durable subscription: " + infos);
+                LOG.debug("Restoring durable subscription: " + info);
                 SubscriptionKey key = new SubscriptionKey(info);
 
                 // A single durable sub may be subscribing to multiple topics.
@@ -313,4 +313,8 @@ public class TopicRegion extends Abstrac
         this.keepDurableSubsActive = keepDurableSubsActive;
     }
 
+    public boolean durableSubscriptionExists(SubscriptionKey key) {
+        return this.durableSubscriptions.containsKey(key);
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=932342&r1=932341&r2=932342&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Apr  9 10:36:00 2010
@@ -30,6 +30,7 @@ import java.util.concurrent.ThreadPoolEx
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.activemq.Service;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
@@ -65,6 +66,7 @@ import org.apache.activemq.command.Sessi
 import org.apache.activemq.command.ShutdownInfo;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.filter.DestinationFilter;
+import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
@@ -88,9 +90,9 @@ import org.apache.commons.logging.LogFac
  * @version $Revision$
  */
 public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
-
     private static final Log LOG = LogFactory.getLog(DemandForwardingBridge.class);
     private static final ThreadPoolExecutor ASYNC_TASKS;
+    protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
     protected final Transport localBroker;
     protected final Transport remoteBroker;
     protected final IdGenerator idGenerator = new IdGenerator();
@@ -677,45 +679,41 @@ public abstract class DemandForwardingBr
                     final MessageDispatch md = (MessageDispatch) command;
                     final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
                     if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
-                        // See if this consumer's brokerPath tells us it came from the broker at the other end
-                        // of the bridge. I think we should be making this decision based on the message's
-                        // broker bread crumbs and not the consumer's? However, the message's broker bread
-                        // crumbs are null, which is another matter.   
-                        boolean cameFromRemote = false;
-                        Object consumerInfo = md.getMessage().getDataStructure();
-                        if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo))
-                            cameFromRemote = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId());
-
+                        
+                        if (originallyCameFromRemote(md, sub)) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL: " + md.getMessage());
+                            }
+                            // still ack as it may be durable
+                            try {
+                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
+                            } finally {
+                                sub.decrementOutstandingResponses();
+                            }
+                            return;
+                        }
+                        
                         Message message = configureMessage(md);
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message);
                         }
-
+                        
                         if (!message.isResponseRequired()) {
-
+                            
                             // If the message was originally sent using async
                             // send, we will preserve that QOS
                             // by bridging it using an async send (small chance
                             // of message loss).
-
                             try {
-                                // Don't send it off to the remote if it originally came from the remote. 
-                                if (!cameFromRemote) {
-                                    remoteBroker.oneway(message);
-                                } else {
-                                    if (LOG.isDebugEnabled()) {
-                                        LOG.debug("Message not forwarded on to remote, because message came from remote");
-                                    }
-                                }
-
+                                remoteBroker.oneway(message);
                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                                 dequeueCounter.incrementAndGet();
                             } finally {
                                 sub.decrementOutstandingResponses();
                             }
-
+                            
                         } else {
-
+                            
                             // The message was not sent using async send, so we
                             // should only ack the local
                             // broker when we get confirmation that the remote
@@ -730,8 +728,7 @@ public abstract class DemandForwardingBr
                                         } else {
                                             localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                                             dequeueCounter.incrementAndGet();
-
-                                        }
+                                        }   
                                     } catch (IOException e) {
                                         serviceLocalException(e);
                                     } finally {
@@ -739,10 +736,10 @@ public abstract class DemandForwardingBr
                                     }
                                 }
                             };
-
+                            
                             remoteBroker.asyncRequest(message, callback);
+                            
                         }
-
                     } else {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
@@ -779,6 +776,27 @@ public abstract class DemandForwardingBr
         }
     }
 
+    private boolean originallyCameFromRemote(MessageDispatch md, DemandSubscription sub) throws Exception {
+        // See if this consumer's brokerPath tells us it came from the broker at the other end
+        // of the bridge. I think we should be making this decision based on the message's
+        // broker bread crumbs and not the consumer's? However, the message's broker bread
+        // crumbs are null, which is another matter.   
+        boolean cameFromRemote = false;
+        Object consumerInfo = md.getMessage().getDataStructure();
+        if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo)) {
+            cameFromRemote = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId());
+        }
+        
+        // for durable subs, suppression via filter leaves dangling acks so we need to 
+        // check here and allow the ack irrespective
+        if (!cameFromRemote && sub.getLocalInfo().isDurable()) {
+            MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
+            messageEvalContext.setMessageReference(md.getMessage());
+            cameFromRemote = !createNetworkBridgeFilter(null).matches(messageEvalContext);
+        }  
+        return cameFromRemote;
+    }
+
     /**
      * @return Returns the dynamicallyIncludedDestinations.
      */
@@ -1130,9 +1148,14 @@ public abstract class DemandForwardingBr
         subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
         subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
 
-        // This works for now since we use a VM connection to the local broker.
-        // may need to change if we ever subscribe to a remote broker.
-        sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
+        if (!info.isDurable()) {
+            // This works for now since we use a VM connection to the local broker.
+            // may need to change if we ever subscribe to a remote broker.
+            sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
+        } else  {
+            // need to ack this message if it is ignored as it is durable so
+            // we check before we send. see: originallyCameFromRemote()
+        }
     }
 
     protected void removeDemandSubscription(ConsumerId id) throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java?rev=932342&r1=932341&r2=932342&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java Fri Apr  9 10:36:00 2010
@@ -54,7 +54,7 @@ public class DurableConduitBridge extend
      */
     protected void setupStaticDestinations() {
         super.setupStaticDestinations();
-        ActiveMQDestination[] dests = durableDestinations;
+        ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations;
         if (dests != null) {
             for (int i = 0; i < dests.length; i++) {
                 ActiveMQDestination dest = dests[i];
@@ -96,7 +96,7 @@ public class DurableConduitBridge extend
     }
 
     protected String getSubscriberName(ActiveMQDestination dest) {
-        String subscriberName = configuration.getBrokerName() + "_" + dest.getPhysicalName();
+        String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName();
         return subscriberName;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java?rev=932342&r1=932341&r2=932342&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java Fri Apr  9 10:36:00 2010
@@ -55,11 +55,8 @@ public final class NetworkBridgeFactory 
                                                       final NetworkBridgeListener listener) {
         DemandForwardingBridge result = null;
         if (configuration.isConduitSubscriptions()) {
-            if (configuration.isDynamicOnly()) {
-                result = new ConduitBridge(configuration, localTransport, remoteTransport);
-            } else {
-                result = new DurableConduitBridge(configuration, localTransport, remoteTransport);
-            }
+            // dynamicOnly determines whether durables are auto bridged
+            result = new DurableConduitBridge(configuration, localTransport, remoteTransport);
         } else {
             result = new DemandForwardingBridge(configuration, localTransport, remoteTransport);
         }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java?rev=932342&r1=932341&r2=932342&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java Fri Apr  9 10:36:00 2010
@@ -28,8 +28,11 @@ import javax.jms.Session;
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
 import org.apache.activemq.broker.jmx.BrokerView;
 import org.apache.activemq.util.Wait;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public class AMQ2439Test extends JmsMultipleBrokersTestSupport {
+    private static final Log LOG = LogFactory.getLog(AMQ2439Test.class);
     Destination dest;
 
     
@@ -45,6 +48,7 @@ public class AMQ2439Test extends JmsMult
        
        assertTrue("dequeue is correct", Wait.waitFor(new Wait.Condition() {
            public boolean isSatisified() throws Exception {
+               LOG.info("dequeue count (want 1000), is : " + brokerView.getTotalDequeueCount());
                return 1000 == brokerView.getTotalDequeueCount();
            }
        }));

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java?rev=932342&r1=932341&r2=932342&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java Fri Apr  9 10:36:00 2010
@@ -19,11 +19,18 @@ package org.apache.activemq.network;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeNotNull;
 
+import java.io.File;
 import java.net.MalformedURLException;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageListener;
 import javax.jms.Session;
+import javax.jms.TopicSubscriber;
 import javax.management.InstanceNotFoundException;
 import javax.management.MBeanServerConnection;
 import javax.management.ObjectName;
@@ -35,6 +42,9 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.util.Wait;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,28 +54,54 @@ public class NetworkBrokerDetachTest {
 
 	private final static String BROKER_NAME = "broker";
 	private final static String REM_BROKER_NAME = "networkedBroker";
-	private final static String QUEUE_NAME = "testQ";
+	private final static String DESTINATION_NAME = "testQ";
 	private final static int    NUM_CONSUMERS = 1;
 	
     protected static final Log LOG = LogFactory.getLog(NetworkBrokerDetachTest.class);
     protected final int numRestarts = 3;
+    protected final int networkTTL = 2;
+    protected final boolean dynamicOnly = false;
 
     protected BrokerService createBroker() throws Exception {
         BrokerService broker = new BrokerService();
         broker.setBrokerName(BROKER_NAME);
+        configureBroker(broker);
         broker.addConnector("tcp://localhost:61617");
         NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://localhost:62617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false");
-        networkConnector.setDuplex(false);
+        configureNetworkConnector(networkConnector);
         return broker;
     }
-    
+
     protected BrokerService createNetworkedBroker() throws Exception {
         BrokerService broker = new BrokerService();
         broker.setBrokerName(REM_BROKER_NAME);
+        configureBroker(broker);
+        broker.getManagementContext().setCreateConnector(false);
         broker.addConnector("tcp://localhost:62617");
+        NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://localhost:61617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false");
+        configureNetworkConnector(networkConnector);
         return broker;
     }
     
+    private void configureNetworkConnector(NetworkConnector networkConnector) {
+        networkConnector.setDuplex(false);
+        networkConnector.setNetworkTTL(networkTTL);
+        networkConnector.setDynamicOnly(dynamicOnly);
+    }
+    
+    // variants for each store....
+    private void configureBroker(BrokerService broker) throws Exception {
+        //KahaPersistenceAdapter persistenceAdapter = new KahaPersistenceAdapter();
+        //persistenceAdapter.setDirectory(new File("target/activemq-data/kaha/" + broker.getBrokerName() + "/NetworBrokerDetatchTest"));
+        //broker.setPersistenceAdapter(persistenceAdapter);        
+        
+        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
+        persistenceAdapter.setDirectory(new File("target/activemq-data/kahadb/NetworBrokerDetatchTest"));
+        broker.setPersistenceAdapter(persistenceAdapter);
+        
+        // default AMQ
+    }
+
     @Test
     public void testNetworkedBrokerDetach() throws Exception {
         BrokerService broker = createBroker();
@@ -79,29 +115,13 @@ public class NetworkBrokerDetachTest {
         ConnectionFactory consFactory = createConnectionFactory(networkedBroker);
         Connection consConn = consFactory.createConnection();
         Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        
+        ActiveMQDestination destination = (ActiveMQDestination) consSession.createQueue(DESTINATION_NAME);
         for(int i=0; i<NUM_CONSUMERS; i++) {
-          consSession.createConsumer(consSession.createQueue(QUEUE_NAME));
+            consSession.createConsumer(destination);
         }
         
-        assertTrue("got expected consumer count from mbean within time limit", Wait.waitFor(new Wait.Condition() {
-
-            public boolean isSatisified() throws Exception {
-                boolean result = false;
-                MBeanServerConnection mbsc = getMBeanServerConnection();
-                if (mbsc != null) {                
-                    // We should have 1 consumer for the queue on the local broker
-                    Object consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME, "ConsumerCount");
-                    if (consumers != null) {
-                        LOG.info("Consumers for " + QUEUE_NAME + " on " + BROKER_NAME + " : " + consumers);
-                        if (1L == ((Long)consumers).longValue()) {
-                            result = true;
-                        }
-                    }
-                }
-                return result;
-            }      
-        }));
+        assertTrue("got expected consumer count from mbean within time limit", 
+                verifyConsumerCount(1, destination, BROKER_NAME));
         
         
         LOG.info("Stopping Consumer on the networked broker ...");
@@ -109,24 +129,7 @@ public class NetworkBrokerDetachTest {
         consConn.close();
         
         // We should have 0 consumer for the queue on the local broker
-        assertTrue("got expected 0 count from mbean within time limit", Wait.waitFor(new Wait.Condition() {
-
-            public boolean isSatisified() throws Exception {
-                boolean result = false;
-                MBeanServerConnection mbsc = getMBeanServerConnection();
-                if (mbsc != null) {                
-                    // We should have 1 consumer for the queue on the local broker
-                    Object consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME, "ConsumerCount");
-                    if (consumers != null) {
-                        LOG.info("Consumers for " + QUEUE_NAME + " on " + BROKER_NAME + " : " + consumers);
-                        if (0L == ((Long)consumers).longValue()) {
-                            result = true;
-                        }
-                    }
-                }
-                return result;
-            }      
-        }));
+        assertTrue("got expected 0 count from mbean within time limit", verifyConsumerCount(0, destination, BROKER_NAME));
         
         networkedBroker.stop();
         networkedBroker.waitUntilStopped();
@@ -134,6 +137,106 @@ public class NetworkBrokerDetachTest {
         broker.waitUntilStopped();
     }
 
+    
+    @Test
+    public void testNetworkedBrokerDurableSubAfterRestart() throws Exception {        
+        BrokerService brokerOne = createBroker();
+        brokerOne.setDeleteAllMessagesOnStartup(true);
+        brokerOne.start();
+
+        BrokerService brokerTwo = createNetworkedBroker();
+        brokerTwo.setDeleteAllMessagesOnStartup(true);
+        brokerTwo.start();
+        
+        final AtomicInteger count = new AtomicInteger(0);
+        MessageListener counter = new MessageListener() {
+            public void onMessage(Message message) {
+                count.incrementAndGet();
+            }
+        };
+        
+        LOG.info("Creating durable consumer on each broker ...");
+        ActiveMQTopic destination = registerDurableConsumer(brokerTwo, counter);
+        registerDurableConsumer(brokerOne, counter);
+        
+        assertTrue("got expected consumer count from local broker mbean within time limit",
+                verifyConsumerCount(2, destination, BROKER_NAME));
+        
+        assertTrue("got expected consumer count from network broker mbean within time limit",
+                verifyConsumerCount(2, destination, REM_BROKER_NAME));
+        
+        sendMessageTo(destination, brokerOne);
+        
+        assertTrue("Got one message on each", verifyMessageCount(2, count));
+        
+        LOG.info("Stopping brokerTwo...");
+        brokerTwo.stop();
+        brokerTwo.waitUntilStopped();           
+        
+        LOG.info("restarting  broker Two...");
+        brokerTwo = createNetworkedBroker();
+        brokerTwo.start();
+   
+        LOG.info("Recreating durable Consumer on the broker after restart...");
+        registerDurableConsumer(brokerTwo, counter);
+        
+        // give advisories a chance to percolate
+        TimeUnit.SECONDS.sleep(5);
+        
+        sendMessageTo(destination, brokerOne);
+        
+        // expect similar after restart
+        assertTrue("got expected consumer count from local broker mbean within time limit",
+                verifyConsumerCount(2, destination, BROKER_NAME));
+ 
+        // a durable sub is auto bridged on restart unless dynamicOnly=true
+        assertTrue("got expected consumer count from network broker mbean within time limit",
+                verifyConsumerCount(2, destination, REM_BROKER_NAME));
+
+        assertTrue("got no inactive subs on broker", verifyDurableConsumerCount(0, BROKER_NAME));
+        assertTrue("got no inactive subs on other broker", verifyDurableConsumerCount(0, REM_BROKER_NAME));
+
+        assertTrue("Got two more messages after restart", verifyMessageCount(4, count));
+        TimeUnit.SECONDS.sleep(1);
+        assertTrue("still Got just two more messages", verifyMessageCount(4, count));
+        
+        brokerTwo.stop();
+        brokerTwo.waitUntilStopped();
+        brokerOne.stop();
+        brokerOne.waitUntilStopped();
+    }
+
+    private boolean verifyMessageCount(final int i, final AtomicInteger count) throws Exception {
+        return Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                return i == count.get();
+            }      
+        });
+    }
+
+    private ActiveMQTopic registerDurableConsumer(
+            BrokerService brokerService, MessageListener listener) throws Exception {
+        ConnectionFactory factory = createConnectionFactory(brokerService);
+        Connection connection = factory.createConnection();
+        connection.setClientID("DurableOne");
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQTopic destination = (ActiveMQTopic) session.createTopic(DESTINATION_NAME);
+        // unique to a broker
+        TopicSubscriber sub = session.createDurableSubscriber(destination, "SubOne" + brokerService.getBrokerName());
+        sub.setMessageListener(listener);
+        return destination;
+    }
+
+    private void sendMessageTo(ActiveMQTopic destination, BrokerService brokerService) throws Exception {
+        ConnectionFactory factory = createConnectionFactory(brokerService);
+        Connection conn = factory.createConnection();
+        conn.start();
+        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createProducer(destination).send(session.createTextMessage("Hi"));
+        conn.close();
+    }
+    
     protected ConnectionFactory createConnectionFactory(final BrokerService broker) throws Exception {
         
         String url = ((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString();
@@ -154,6 +257,46 @@ public class NetworkBrokerDetachTest {
     }
     
     // JMX Helper Methods 
+    private boolean verifyConsumerCount(final long expectedCount, final ActiveMQDestination destination, final String brokerName) throws Exception {
+        return Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                boolean result = false;
+                MBeanServerConnection mbsc = getMBeanServerConnection();
+                if (mbsc != null) {                
+                    // We should have 1 consumer for the queue on the local broker
+                    Object consumers = getAttribute(mbsc, brokerName, destination.isQueue() ? "Queue" : "Topic", "Destination=" + destination.getPhysicalName(), "ConsumerCount");
+                    if (consumers != null) {
+                        LOG.info("Consumers for " + destination.getPhysicalName() + " on " + brokerName + " : " + consumers);
+                        if (expectedCount == ((Long)consumers).longValue()) {
+                            result = true;
+                        }
+                    }
+                }
+                return result;
+            }      
+        });
+    }
+    
+    
+    private boolean verifyDurableConsumerCount(final long expectedCount, final String brokerName) throws Exception {
+        return Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                boolean result = false;
+                MBeanServerConnection mbsc = getMBeanServerConnection();
+                if (mbsc != null) {
+                    Set subs = getMbeans(mbsc, brokerName, "Subscription", "active=false,*");
+                    if (subs != null) {
+                        LOG.info("inactive durable subs on " + brokerName + " : " + subs);
+                        if (expectedCount == subs.size()) {
+                            result = true;
+                        }
+                    }
+                }
+                return result;
+            }      
+        });
+    }
+
     
     private MBeanServerConnection getMBeanServerConnection() throws MalformedURLException {
         final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
@@ -171,10 +314,21 @@ public class NetworkBrokerDetachTest {
         return mbsc;
     }
     
-    private Object getAttribute(MBeanServerConnection mbsc, String type, String pattern, String attrName) throws Exception {
+    
+    private Set getMbeans(MBeanServerConnection mbsc, String brokerName, String type, String pattern) throws Exception {
+        Set obj = null;
+        try {
+            obj = mbsc.queryMBeans(getObjectName(brokerName, type, pattern), null);
+        } catch (InstanceNotFoundException ignored) {
+            LOG.warn("getAttribute ex: " + ignored);
+        }
+        return obj;
+    }
+    
+    private Object getAttribute(MBeanServerConnection mbsc, String brokerName, String type, String pattern, String attrName) throws Exception {
         Object obj = null;
         try {
-            obj = mbsc.getAttribute(getObjectName(BROKER_NAME, type, pattern), attrName);
+            obj = mbsc.getAttribute(getObjectName(brokerName, type, pattern), attrName);
         } catch (InstanceNotFoundException ignored) {
             LOG.warn("getAttribute ex: " + ignored);
         }