You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/08/01 14:03:42 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6383 https://issues.apache.org/jira/browse/AMQ-6373

Repository: activemq
Updated Branches:
  refs/heads/master 71bb54f94 -> e73ab3483


https://issues.apache.org/jira/browse/AMQ-6383
https://issues.apache.org/jira/browse/AMQ-6373

Adding a new flag that can be specified on a network bridge to allow
forcing of subscriptions to be durable.  Cleaned up some unit tests.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e73ab348
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e73ab348
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e73ab348

Branch: refs/heads/master
Commit: e73ab348375db71116624326d29398e0088d7334
Parents: 71bb54f
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Sat Jul 30 10:58:29 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Mon Aug 1 10:02:56 2016 -0400

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java    |   1 +
 .../apache/activemq/network/ConduitBridge.java  |   8 +-
 .../network/DemandForwardingBridgeSupport.java  |  55 ++-
 .../activemq/network/DemandSubscription.java    |  13 +
 .../activemq/network/DurableConduitBridge.java  |  51 ++-
 ...ingToListOfActiveMQDestinationConverter.java |  18 +-
 .../network/DurableSyncNetworkBridgeTest.java   | 104 +++---
 .../network/DynamicNetworkTestSupport.java      |  66 +++-
 .../network/ForceDurableNetworkBridgeTest.java  | 351 +++++++++++++++++++
 .../network/VirtualConsumerDemandTest.java      |  38 +-
 10 files changed, 614 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 5288dc9..a05ba8c 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -103,6 +103,7 @@ import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
 import org.apache.activemq.util.SubscriptionKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
index 10df1f9..3c0b85b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
@@ -46,7 +46,7 @@ public class ConduitBridge extends DemandForwardingBridge {
 
     @Override
     protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
-        if (addToAlreadyInterestedConsumers(info)) {
+        if (addToAlreadyInterestedConsumers(info, false)) {
             return null; // don't want this subscription added
         }
         //add our original id to ourselves
@@ -55,7 +55,7 @@ public class ConduitBridge extends DemandForwardingBridge {
         return doCreateDemandSubscription(info);
     }
 
-    protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) {
+    protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info, boolean isForcedDurable) {
         // search through existing subscriptions and see if we have a match
         if (info.isNetworkSubscription()) {
             return false;
@@ -71,6 +71,10 @@ public class ConduitBridge extends DemandForwardingBridge {
                 // add the interest in the subscription
                 if (!info.isDurable()) {
                     ds.add(info.getConsumerId());
+                    if (isForcedDurable) {
+                        forcedDurableRemoteId.add(info.getConsumerId());
+                        ds.addForcedDurableConsumer(info.getConsumerId());
+                    }
                 } else {
                     ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 865e60e..8a3a56a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -21,9 +21,11 @@ import java.security.GeneralSecurityException;
 import java.security.cert.X509Certificate;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
@@ -99,6 +101,7 @@ import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.activemq.util.MarshallingSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -131,6 +134,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
     protected ActiveMQDestination[] durableDestinations;
     protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<>();
     protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<>();
+    protected final Set<ConsumerId> forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<ConsumerId, Boolean>());
     protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
     protected final CountDownLatch startedLatch = new CountDownLatch(2);
     protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
@@ -549,6 +553,21 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                     // set our properties
                     Properties props = new Properties();
                     IntrospectionSupport.getProperties(configuration, props, null);
+
+                    String dynamicallyIncludedDestinationsKey = "dynamicallyIncludedDestinations";
+                    String staticallyIncludedDestinationsKey = "staticallyIncludedDestinations";
+
+                    if (!configuration.getDynamicallyIncludedDestinations().isEmpty()) {
+                        props.put(dynamicallyIncludedDestinationsKey,
+                                StringToListOfActiveMQDestinationConverter.
+                                convertFromActiveMQDestination(configuration.getDynamicallyIncludedDestinations(), true));
+                    }
+                    if (!configuration.getStaticallyIncludedDestinations().isEmpty()) {
+                        props.put(staticallyIncludedDestinationsKey,
+                                StringToListOfActiveMQDestinationConverter.
+                                convertFromActiveMQDestination(configuration.getStaticallyIncludedDestinations(), true));
+                    }
+
                     props.remove("networkTTL");
                     String str = MarshallingSupport.propertiesToString(props);
                     brokerInfo.setNetworkProperties(str);
@@ -858,6 +877,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         } else if (data.getClass() == RemoveInfo.class) {
             ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
             removeDemandSubscription(id);
+
+            if (forcedDurableRemoteId.remove(id)) {
+                for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
+                    DemandSubscription ds = i.next();
+                    boolean removed = ds.removeForcedDurableConsumer(id);
+                    if (removed) {
+                        cleanupDurableSub(ds, i);
+                    }
+                }
+           }
+
         } else if (data.getClass() == RemoveSubscriptionInfo.class) {
             RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
             SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
@@ -873,7 +903,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
 
     private void cleanupDurableSub(final DemandSubscription ds,
             Iterator<DemandSubscription> i) throws IOException {
-        if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()) {
+        if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()
+                && ds.getForcedDurableConsumersSize() == 0) {
 
             // deactivate subscriber
             RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
@@ -1196,7 +1227,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
 
         dests = dynamicallyIncludedDestinations;
         if (dests != null && dests.length > 0) {
-            return matchesDynamicallyIncludedDestinations(destination);
+            for (ActiveMQDestination dest : dests) {
+                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
+                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
+                    return true;
+                }
+            }
+
+            return false;
         }
 
         return true;
@@ -1216,6 +1254,19 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         return false;
     }
 
+    protected ActiveMQDestination findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination destination) {
+        if (dests != null && dests.length > 0) {
+            for (ActiveMQDestination dest : dests) {
+                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
+                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
+                    return dest;
+                }
+            }
+        }
+
+        return null;
+    }
+
     /**
      * Subscriptions for these destinations are always created
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
index b53646a..371df0a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
@@ -41,6 +41,7 @@ public class DemandSubscription {
     private final AtomicInteger dispatched = new AtomicInteger(0);
     private final AtomicBoolean activeWaiter = new AtomicBoolean();
     private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>();
+    private final Set<ConsumerId> forcedDurableConsumers = new CopyOnWriteArraySet<ConsumerId>();
     private SubscriptionInfo localDurableSubscriber;
 
     private NetworkBridgeFilter networkBridgeFilter;
@@ -106,6 +107,18 @@ public class DemandSubscription {
         return remoteInfo;
     }
 
+    public boolean addForcedDurableConsumer(ConsumerId id) {
+        return forcedDurableConsumers.add(id);
+    }
+
+    public boolean removeForcedDurableConsumer(ConsumerId id) {
+        return forcedDurableConsumers.remove(id);
+    }
+
+    public int getForcedDurableConsumersSize() {
+        return forcedDurableConsumers.size();
+    }
+
     public void waitForCompletion() {
         if (dispatched.get() > 0) {
             LOG.debug("Waiting for completion for sub: {}, dispatched: {}", localInfo.getConsumerId(), this.dispatched.get());

http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index 237e272..e699272 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -17,7 +17,9 @@
 package org.apache.activemq.network;
 
 import java.io.IOException;
+import java.util.Map;
 
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicRegion;
@@ -26,6 +28,7 @@ import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.TypeConversionSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,13 +95,16 @@ public class DurableConduitBridge extends ConduitBridge {
 
     @Override
     protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
-        if (addToAlreadyInterestedConsumers(info)) {
+        boolean isForcedDurable = isForcedDurable(info);
+
+        if (addToAlreadyInterestedConsumers(info, isForcedDurable)) {
             return null; // don't want this subscription added
         }
         //add our original id to ourselves
         info.addNetworkConsumerId(info.getConsumerId());
+        ConsumerId forcedDurableId = isForcedDurable ? info.getConsumerId() : null;
 
-        if (info.isDurable()) {
+        if(info.isDurable() || isForcedDurable) {
             // set the subscriber name to something reproducible
             info.setSubscriptionName(getSubscriberName(info.getDestination()));
             // and override the consumerId with something unique so that it won't
@@ -107,7 +113,46 @@ public class DurableConduitBridge extends ConduitBridge {
                                consumerIdGenerator.getNextSequenceId()));
         }
         info.setSelector(null);
-        return doCreateDemandSubscription(info);
+        DemandSubscription demandSubscription = doCreateDemandSubscription(info);
+        if (forcedDurableId != null) {
+            demandSubscription.addForcedDurableConsumer(forcedDurableId);
+            forcedDurableRemoteId.add(forcedDurableId);
+        }
+        return demandSubscription;
+    }
+
+
+    private boolean isForcedDurable(ConsumerInfo info) {
+        if (info.isDurable()) {
+            return false;
+        }
+
+        ActiveMQDestination destination = info.getDestination();
+        if (AdvisorySupport.isAdvisoryTopic(destination) || destination.isTemporary() ||
+                destination.isQueue()) {
+            return false;
+        }
+
+        ActiveMQDestination matching = findMatchingDestination(dynamicallyIncludedDestinations, destination);
+        if (matching != null) {
+            return isDestForcedDurable(matching);
+        }
+        matching = findMatchingDestination(staticallyIncludedDestinations, destination);
+        if (matching != null) {
+            return isDestForcedDurable(matching);
+        }
+        return false;
+    }
+
+    private boolean isDestForcedDurable(ActiveMQDestination destination) {
+        final Map<String, String> options = destination.getOptions();
+
+        boolean isForceDurable = false;
+        if (options != null) {
+            isForceDurable = (boolean) TypeConversionSupport.convert(options.get("forceDurable"), boolean.class);
+        }
+
+        return isForceDurable;
     }
 
     protected String getSubscriberName(ActiveMQDestination dest) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-client/src/main/java/org/apache/activemq/util/StringToListOfActiveMQDestinationConverter.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/StringToListOfActiveMQDestinationConverter.java b/activemq-client/src/main/java/org/apache/activemq/util/StringToListOfActiveMQDestinationConverter.java
index 7f58f20..130afa7 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/StringToListOfActiveMQDestinationConverter.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/StringToListOfActiveMQDestinationConverter.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.util;
 
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -59,6 +61,10 @@ public class StringToListOfActiveMQDestinationConverter {
     }
 
     public static String convertFromActiveMQDestination(Object value) {
+        return convertFromActiveMQDestination(value, false);
+    }
+
+    public static String convertFromActiveMQDestination(Object value, boolean includeOptions) {
         if (value == null) {
             return null;
         }
@@ -70,7 +76,17 @@ public class StringToListOfActiveMQDestinationConverter {
                 Object e = list.get(i);
                 if (e instanceof ActiveMQDestination) {
                     ActiveMQDestination destination = (ActiveMQDestination) e;
-                    sb.append(destination);
+                    if (includeOptions && destination.getOptions() != null) {
+                        try {
+                            //Reapply the options as URI parameters
+                            sb.append(destination.toString() + URISupport.applyParameters(
+                                new URI(""), destination.getOptions()));
+                        } catch (URISyntaxException e1) {
+                            sb.append(destination);
+                        }
+                    } else {
+                        sb.append(destination);
+                    }
                     if (i < list.size() - 1) {
                         sb.append(", ");
                     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index 1390d05..3c4a2a0 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.network;
 
-import static org.junit.Assert.assertTrue;
-
 import java.io.File;
 import java.net.URI;
 import java.util.Arrays;
@@ -30,12 +28,10 @@ import javax.jms.Session;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.advisory.AdvisoryBroker;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.CommandTypes;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.util.Wait;
 import org.apache.activemq.util.Wait.Condition;
 import org.junit.After;
@@ -98,12 +94,12 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         sub1.close();
 
         assertSubscriptionsCount(broker1, topic, 1);
-        assertNCSubscriptionsCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
 
         removeSubscription(broker1, topic, subName);
 
         assertSubscriptionsCount(broker1, topic, 0);
-        assertNCSubscriptionsCount(broker2, topic, 0);
+        assertNCDurableSubsCount(broker2, topic, 0);
 
     }
 
@@ -114,17 +110,17 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         sub1.close();
 
         assertSubscriptionsCount(broker1, topic, 1);
-        assertNCSubscriptionsCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
 
         restartBrokers(true);
 
         assertSubscriptionsCount(broker1, topic, 1);
-        assertNCSubscriptionsCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
 
         removeSubscription(broker1, topic, subName);
 
         assertSubscriptionsCount(broker1, topic, 0);
-        assertNCSubscriptionsCount(broker2, topic, 0);
+        assertNCDurableSubsCount(broker2, topic, 0);
 
     }
 
@@ -135,7 +131,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         sub1.close();
 
         assertSubscriptionsCount(broker1, topic, 1);
-        assertNCSubscriptionsCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
 
         doTearDown();
         restartBroker(broker1, false);
@@ -146,9 +142,9 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         //Test that on successful reconnection of the bridge that
         //the NC sub will be removed
         restartBroker(broker2, true);
-        assertNCSubscriptionsCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
         restartBroker(broker1, true);
-        assertNCSubscriptionsCount(broker2, topic, 0);
+        assertNCDurableSubsCount(broker2, topic, 0);
 
     }
 
@@ -160,7 +156,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         sub1.close();
 
         assertSubscriptionsCount(broker1, topic, 1);
-        assertNCSubscriptionsCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
 
         doTearDown();
         restartBroker(broker1, false);
@@ -176,13 +172,13 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
 
         //before sync, the old NC should exist
         restartBroker(broker2, true);
-        assertNCSubscriptionsCount(broker2, topic, 1);
-        assertNCSubscriptionsCount(broker2, topic2, 0);
+        assertNCDurableSubsCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, topic2, 0);
 
         //After sync, remove old NC and create one for topic 2
         restartBroker(broker1, true);
-        assertNCSubscriptionsCount(broker2, topic, 0);
-        assertNCSubscriptionsCount(broker2, topic2, 1);
+        assertNCDurableSubsCount(broker2, topic, 0);
+        assertNCDurableSubsCount(broker2, topic2, 1);
 
     }
 
@@ -193,7 +189,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName);
 
         assertSubscriptionsCount(broker1, topic, 0);
-        assertNCSubscriptionsCount(broker2, topic, 0);
+        assertNCDurableSubsCount(broker2, topic, 0);
 
         doTearDown();
         restartBroker(broker1, false);
@@ -207,9 +203,9 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         assertSubscriptionsCount(broker1, topic2, 1);
 
         restartBrokers(true);
-        assertNCSubscriptionsCount(broker2, topic, 1);
-        assertNCSubscriptionsCount(broker2, topic2, 1);
-        assertNCSubscriptionsCount(broker2, excludeTopic, 0);
+        assertNCDurableSubsCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, topic2, 1);
+        assertNCDurableSubsCount(broker2, excludeTopic, 0);
 
     }
 
@@ -223,7 +219,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
 
         assertSubscriptionsCount(broker1, topic, 0);
-        assertNCSubscriptionsCount(broker2, topic, 0);
+        assertNCDurableSubsCount(broker2, topic, 0);
 
         doTearDown();
         restartBroker(broker1, false);
@@ -235,7 +231,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         //Since we are using an old version of openwire, the NC should
         //not be added
         restartBrokers(true);
-        assertNCSubscriptionsCount(broker2, topic, 0);
+        assertNCDurableSubsCount(broker2, topic, 0);
 
     }
 
@@ -246,7 +242,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
 
         assertSubscriptionsCount(broker1, topic, 0);
-        assertNCSubscriptionsCount(broker2, topic, 0);
+        assertNCDurableSubsCount(broker2, topic, 0);
 
         doTearDown();
         restartBroker(broker1, false);
@@ -256,7 +252,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         assertSubscriptionsCount(broker1, topic, 1);
 
         restartBrokers(true);
-        assertNCSubscriptionsCount(broker2, topic, 0);
+        assertNCDurableSubsCount(broker2, topic, 0);
     }
 
     @Test
@@ -266,7 +262,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
 
         assertSubscriptionsCount(broker1, topic, 0);
-        assertNCSubscriptionsCount(broker2, topic, 0);
+        assertNCDurableSubsCount(broker2, topic, 0);
 
         doTearDown();
         restartBroker(broker1, false);
@@ -276,10 +272,10 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         assertSubscriptionsCount(broker1, topic, 1);
 
         restartBrokers(true);
-        assertNCSubscriptionsCount(broker2, topic, 0);
+        assertNCDurableSubsCount(broker2, topic, 0);
         //bring online again
         session1.createDurableSubscriber(topic, subName);
-        assertNCSubscriptionsCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
 
     }
 
@@ -290,7 +286,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
 
         session1.createDurableSubscriber(topic, subName).close();
         assertSubscriptionsCount(broker1, topic, 1);
-        assertNCSubscriptionsCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
 
         doTearDown();
         restartBroker(broker1, false);
@@ -301,8 +297,8 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         assertSubscriptionsCount(broker1, topic, 1);
 
         restartBrokers(true);
-        assertNCSubscriptionsCount(broker2, topic, 1);
-        assertNCSubscriptionsCount(broker2, excludeTopic, 0);
+        assertNCDurableSubsCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, excludeTopic, 0);
 
     }
 
@@ -314,7 +310,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName);
 
         assertSubscriptionsCount(broker1, topic, 0);
-        assertNCSubscriptionsCount(broker2, topic, 0);
+        assertNCDurableSubsCount(broker2, topic, 0);
 
         doTearDown();
         restartBrokers(false);
@@ -342,32 +338,9 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         session1.createDurableSubscriber(excludeTopic, "sub-exclude");
 
         Thread.sleep(1000);
-        assertNCSubscriptionsCount(broker2, topic, 1);
-        assertNCSubscriptionsCount(broker2, excludeTopic, 0);
-
-    }
-
-    protected void removeSubscription(final BrokerService brokerService, final ActiveMQTopic topic,
-            final String subName) throws Exception {
-        final RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
-        info.setClientId(clientId);
-        info.setSubscriptionName(subName);
+        assertNCDurableSubsCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, excludeTopic, 0);
 
-        final ConnectionContext context = new ConnectionContext();
-        context.setBroker(brokerService.getBroker());
-        context.setClientId(clientId);
-
-        brokerService.getBroker().removeSubscription(context, info);
-    }
-
-    protected void assertSubscriptionsCount(final BrokerService brokerService,
-            final ActiveMQTopic dest, final int count) throws Exception {
-        assertTrue(Wait.waitFor(new Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return count == getSubscriptions(brokerService, dest).size();
-            }
-        }, 10000, 500));
     }
 
     protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception {
@@ -387,7 +360,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
     protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, File localDataDir,
             File remoteDataDir) throws Exception {
         included = new ActiveMQTopic(testTopicName);
-        doSetUpRemoteBroker(deleteAllMessages, remoteDataDir);
+        doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0);
         doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, localDataDir);
         //Give time for advisories to propagate
         Thread.sleep(1000);
@@ -399,8 +372,13 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
     }
 
     protected void restartRemoteBroker() throws Exception {
+        int port = 0;
+        if (remoteBroker != null) {
+            List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
+            port = transportConnectors.get(0).getConnectUri().getPort();
+        }
         stopRemoteBroker();
-        doSetUpRemoteBroker(false, remoteBroker.getDataDirectoryFile());
+        doSetUpRemoteBroker(false, remoteBroker.getDataDirectoryFile(), port);
     }
 
     protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector,
@@ -438,8 +416,8 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         }
     }
 
-    protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir) throws Exception {
-        remoteBroker = createRemoteBroker(dataDir);
+    protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir, int port) throws Exception {
+        remoteBroker = createRemoteBroker(dataDir, port);
         remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
         remoteBroker.start();
         remoteBroker.waitUntilStarted();
@@ -494,7 +472,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
 
     protected AdvisoryBroker remoteAdvisoryBroker;
 
-    protected BrokerService createRemoteBroker(File dataDir) throws Exception {
+    protected BrokerService createRemoteBroker(File dataDir, int port) throws Exception {
         BrokerService brokerService = new BrokerService();
         brokerService.setBrokerName("remoteBroker");
         brokerService.setUseJmx(false);
@@ -502,7 +480,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
 
         remoteAdvisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
 
-        brokerService.addConnector("tcp://localhost:0?wireFormat.version=" + remoteBrokerWireFormatVersion);
+        brokerService.addConnector("tcp://localhost:" + port + "?wireFormat.version=" + remoteBrokerWireFormatVersion);
 
         return brokerService;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
index 29b726a..0b388cc 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
@@ -30,8 +30,11 @@ import javax.jms.Session;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFilter;
 import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
@@ -96,24 +99,24 @@ public abstract class DynamicNetworkTestSupport {
     }
 
     protected void waitForConsumerCount(final DestinationStatistics destinationStatistics, final int count) throws Exception {
-        Wait.waitFor(new Wait.Condition() {
+        assertTrue(Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
                 //should only be 1 for the composite destination creation
                 return count == destinationStatistics.getConsumers().getCount();
             }
-        });
+        }));
     }
 
     protected void waitForDispatchFromLocalBroker(final DestinationStatistics destinationStatistics, final int count) throws Exception {
-        Wait.waitFor(new Wait.Condition() {
+        assertTrue(Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
                 return count == destinationStatistics.getDequeues().getCount() &&
                        count == destinationStatistics.getDispatched().getCount() &&
                        count == destinationStatistics.getForwards().getCount();
             }
-        });
+        }));
     }
 
     protected void assertLocalBrokerStatistics(final DestinationStatistics localStatistics, final int count) {
@@ -126,16 +129,32 @@ public abstract class DynamicNetworkTestSupport {
         MessageConsumer createConsumer() throws JMSException;
     }
 
-    protected void assertNCSubscriptionsCount(final BrokerService brokerService,
+    protected void assertNCDurableSubsCount(final BrokerService brokerService,
             final ActiveMQTopic dest, final int count) throws Exception {
         assertTrue(Wait.waitFor(new Condition() {
             @Override
             public boolean isSatisified() throws Exception {
-                return count == getNCSubscriptions(brokerService, dest).size();
+                return count == getNCDurableSubs(brokerService, dest).size();
             }
         }, 10000, 500));
     }
 
+    protected void assertConsumersCount(final BrokerService brokerService,
+            final ActiveMQTopic dest, final int count) throws Exception {
+        assertTrue(Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return count == getConsumers(brokerService, dest).size();
+            }
+        }, 10000, 500));
+    }
+
+    protected List<Subscription> getConsumers(final BrokerService brokerService,
+            final ActiveMQTopic dest) throws Exception {
+        Topic destination = (Topic) brokerService.getDestination(dest);
+        return destination.getConsumers();
+    }
+
     protected List<DurableTopicSubscription> getSubscriptions(final BrokerService brokerService,
             final ActiveMQTopic dest) throws Exception {
         List<DurableTopicSubscription> subs = new ArrayList<>();
@@ -151,10 +170,17 @@ public abstract class DynamicNetworkTestSupport {
         return subs;
     }
 
-    protected List<DurableTopicSubscription> getNCSubscriptions(final BrokerService brokerService,
+    protected List<DurableTopicSubscription> getNCDurableSubs(final BrokerService brokerService,
             final ActiveMQTopic dest) throws Exception {
         List<DurableTopicSubscription> subs = new ArrayList<>();
-        Topic destination = (Topic) brokerService.getDestination(dest);
+        Destination d = brokerService.getDestination(dest);
+        Topic destination = null;
+        if (d instanceof DestinationFilter){
+            destination = ((DestinationFilter) d).getAdaptor(Topic.class);
+        } else {
+            destination = (Topic) d;
+        }
+
         for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
             if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) {
                 DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
@@ -166,4 +192,28 @@ public abstract class DynamicNetworkTestSupport {
         return subs;
     }
 
+    protected void removeSubscription(final BrokerService brokerService, final ActiveMQTopic topic,
+            final String subName) throws Exception {
+        final RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
+        info.setClientId(clientId);
+        info.setSubscriptionName(subName);
+
+        final ConnectionContext context = new ConnectionContext();
+        context.setBroker(brokerService.getBroker());
+        context.setClientId(clientId);
+
+        brokerService.getBroker().removeSubscription(context, info);
+    }
+
+    protected void assertSubscriptionsCount(final BrokerService brokerService,
+            final ActiveMQTopic dest, final int count) throws Exception {
+        assertTrue(Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return count == getSubscriptions(brokerService, dest).size();
+            }
+        }, 10000, 500));
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java
new file mode 100644
index 0000000..770ba6e
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+@RunWith(Parameterized.class)
+public class ForceDurableNetworkBridgeTest extends DynamicNetworkTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(ForceDurableNetworkBridgeTest.class);
+
+    protected String testTopicName2 = "include.nonforced.bar";
+    protected String staticTopic = "include.static.bar";
+    protected String staticTopic2 = "include.static.nonforced.bar";
+    public static enum FLOW {FORWARD, REVERSE};
+    private BrokerService broker1;
+    private BrokerService broker2;
+    private Session session1;
+    private final FLOW flow;
+
+    @Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {FLOW.FORWARD},
+                {FLOW.REVERSE}
+        });
+    }
+
+    public ForceDurableNetworkBridgeTest(final FLOW flow) {
+        this.flow = flow;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        doSetUp(true, tempFolder.newFolder(), tempFolder.newFolder());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        doTearDown();
+    }
+
+    @Test
+    public void testForceDurableSubscriptionStatic() throws Exception {
+        final ActiveMQTopic topic = new ActiveMQTopic(staticTopic);
+
+        assertNCDurableSubsCount(broker2, topic, 1);
+        assertConsumersCount(broker2, topic, 1);
+
+        //Static so consumers stick around
+        assertNCDurableSubsCount(broker2, topic, 1);
+        assertConsumersCount(broker2, topic, 1);
+    }
+
+    @Test
+    public void testConsumerNotForceDurableSubscriptionStatic() throws Exception {
+        final ActiveMQTopic topic = new ActiveMQTopic(staticTopic2);
+
+        assertConsumersCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 0);
+    }
+
+    @Test
+    public void testConsumerNotForceDurableSubscription() throws Exception {
+        final ActiveMQTopic topic = new ActiveMQTopic(testTopicName2);
+        MessageConsumer sub1 = session1.createConsumer(topic);
+
+        assertConsumersCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 0);
+        sub1.close();
+
+        assertNCDurableSubsCount(broker2, topic, 0);
+        assertConsumersCount(broker2, topic, 0);
+    }
+
+    @Test
+    public void testConsumerNotForceDurableWithAnotherDurable() throws Exception {
+        final ActiveMQTopic topic = new ActiveMQTopic(testTopicName2);
+        TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName);
+        session1.createConsumer(topic);
+
+        //1 consumer because of conduit
+        //1 durable sub
+        assertConsumersCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
+
+        //Remove the sub
+        durSub.close();
+        Thread.sleep(1000);
+        removeSubscription(broker1, topic, subName);
+
+        //The durable should be gone even though there is a consumer left
+        //since we are not forcing durable subs
+        assertNCDurableSubsCount(broker2, topic, 0);
+        //consumers count ends up being 0 here, even though there is a non-durable consumer left,
+        //because the durable sub is destroyed and it is a conduit subscription
+        //this is another good reason to want to enable forcing of durables
+        assertConsumersCount(broker2, topic, 0);
+    }
+
+    @Test
+    public void testForceDurableSubscription() throws Exception {
+        final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
+        MessageConsumer sub1 = session1.createConsumer(topic);
+
+        assertNCDurableSubsCount(broker2, topic, 1);
+        assertConsumersCount(broker2, topic, 1);
+        sub1.close();
+
+        assertNCDurableSubsCount(broker2, topic, 0);
+        assertConsumersCount(broker2, topic, 0);
+    }
+
+    @Test
+    public void testForceDurableMultiSubscriptions() throws Exception {
+        final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
+        MessageConsumer sub1 = session1.createConsumer(topic);
+        MessageConsumer sub2 = session1.createConsumer(topic);
+        MessageConsumer sub3 = session1.createConsumer(topic);
+
+        assertNCDurableSubsCount(broker2, topic, 1);
+        assertConsumersCount(broker2, topic, 1);
+        sub1.close();
+        sub2.close();
+
+        assertNCDurableSubsCount(broker2, topic, 1);
+        assertConsumersCount(broker2, topic, 1);
+
+        sub3.close();
+
+        assertNCDurableSubsCount(broker2, topic, 0);
+        assertConsumersCount(broker2, topic, 0);
+    }
+
+    @Test
+    public void testForceDurableSubWithDurableCreatedFirst() throws Exception {
+        final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
+        TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName);
+        durSub.close();
+        assertNCDurableSubsCount(broker2, topic, 1);
+
+        MessageConsumer sub1 = session1.createConsumer(topic);
+        Thread.sleep(1000);
+        assertNCDurableSubsCount(broker2, topic, 1);
+        sub1.close();
+
+        Thread.sleep(1000);
+        assertNCDurableSubsCount(broker2, topic, 1);
+
+        removeSubscription(broker1, topic, subName);
+        assertNCDurableSubsCount(broker2, topic, 0);
+    }
+
+    @Test
+    public void testForceDurableSubWithNonDurableCreatedFirst() throws Exception {
+        final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
+        MessageConsumer sub1 = session1.createConsumer(topic);
+        assertNCDurableSubsCount(broker2, topic, 1);
+
+        TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName);
+        durSub.close();
+        Thread.sleep(1000);
+        assertNCDurableSubsCount(broker2, topic, 1);
+
+        removeSubscription(broker1, topic, subName);
+        Thread.sleep(1000);
+        assertConsumersCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
+
+        sub1.close();
+        assertNCDurableSubsCount(broker2, topic, 0);
+    }
+
+    @Test
+    public void testDurableSticksAroundOnConsumerClose() throws Exception {
+        final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
+        //Create the normal consumer first
+        MessageConsumer sub1 = session1.createConsumer(topic);
+        assertNCDurableSubsCount(broker2, topic, 1);
+
+        TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName);
+        durSub.close();
+        sub1.close();
+        Thread.sleep(1000);
+        //Both consumer and durable are closed but the durable should stick around
+        assertConsumersCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
+
+        removeSubscription(broker1, topic, subName);
+        assertConsumersCount(broker2, topic, 0);
+        assertNCDurableSubsCount(broker2, topic, 0);
+    }
+
+    protected void restartBrokers() throws Exception {
+        doTearDown();
+        doSetUp(false, localBroker.getDataDirectoryFile(), remoteBroker.getDataDirectoryFile());
+    }
+
+    protected void doSetUp(boolean deleteAllMessages, File localDataDir,
+            File remoteDataDir) throws Exception {
+        included = new ActiveMQTopic(testTopicName);
+        doSetUpRemoteBroker(deleteAllMessages, remoteDataDir);
+        doSetUpLocalBroker(deleteAllMessages, localDataDir);
+        //Give time for advisories to propagate
+        Thread.sleep(1000);
+    }
+
+    protected void doSetUpLocalBroker(boolean deleteAllMessages, File dataDir) throws Exception {
+        localBroker = createLocalBroker(dataDir);
+        localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        localBroker.start();
+        localBroker.waitUntilStarted();
+        URI localURI = localBroker.getVmConnectorURI();
+        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
+        fac.setAlwaysSyncSend(true);
+        fac.setDispatchAsync(false);
+        localConnection = fac.createConnection();
+        localConnection.setClientID("clientId");
+        localConnection.start();
+
+        Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1;
+            }
+        }, 10000, 500);
+
+        localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        if (flow.equals(FLOW.FORWARD)) {
+            broker1 = localBroker;
+            session1 = localSession;
+        } else {
+            broker2 = localBroker;
+        }
+    }
+
+    protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir) throws Exception {
+        remoteBroker = createRemoteBroker(dataDir);
+        remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        remoteBroker.start();
+        remoteBroker.waitUntilStarted();
+        URI remoteURI = remoteBroker.getVmConnectorURI();
+        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(remoteURI);
+        remoteConnection = fac.createConnection();
+        remoteConnection.setClientID("clientId");
+        remoteConnection.start();
+        remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        if (flow.equals(FLOW.FORWARD)) {
+            broker2 = remoteBroker;
+        } else {
+            broker1 = remoteBroker;
+            session1 = remoteSession;
+        }
+    }
+
+    protected BrokerService createLocalBroker(File dataDir) throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setMonitorConnectionSplits(true);
+        brokerService.setDataDirectoryFile(dataDir);
+        brokerService.setBrokerName("localBroker");
+        brokerService.addNetworkConnector(configureLocalNetworkConnector());
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.setDestinations(new ActiveMQDestination[] {
+                new ActiveMQTopic(testTopicName),
+                new ActiveMQTopic(testTopicName2),
+                new ActiveMQTopic(excludeTopicName)});
+
+        return brokerService;
+    }
+
+    protected NetworkConnector configureLocalNetworkConnector() throws Exception {
+        List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
+        URI remoteURI = transportConnectors.get(0).getConnectUri();
+        String uri = "static:(" + remoteURI + ")";
+        NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
+        connector.setName("networkConnector");
+        connector.setDynamicOnly(false);
+        connector.setDecreaseNetworkConsumerPriority(false);
+        connector.setConduitSubscriptions(true);
+        connector.setDuplex(true);
+        connector.setStaticBridge(false);
+        connector.setStaticallyIncludedDestinations(Lists.<ActiveMQDestination>newArrayList(
+                new ActiveMQTopic(staticTopic + "?forceDurable=true"),
+                new ActiveMQTopic(staticTopic2)));
+        connector.setDynamicallyIncludedDestinations(
+                Lists.<ActiveMQDestination>newArrayList(
+                        new ActiveMQTopic("include.test.>?forceDurable=true"),
+                        new ActiveMQTopic(testTopicName2)));
+        connector.setExcludedDestinations(
+                Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(excludeTopicName)));
+        return connector;
+    }
+
+
+    protected BrokerService createRemoteBroker(File dataDir) throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setBrokerName("remoteBroker");
+        brokerService.setUseJmx(false);
+        brokerService.setDataDirectoryFile(dataDir);
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.setDestinations(new ActiveMQDestination[] {
+                new ActiveMQTopic(testTopicName),
+                new ActiveMQTopic(testTopicName2),
+                new ActiveMQTopic(excludeTopicName)});
+
+        return brokerService;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
index f474929..782f53f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
@@ -25,6 +25,7 @@ import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
@@ -39,6 +40,7 @@ import org.apache.activemq.advisory.AdvisoryBroker;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.DestinationInterceptor;
 import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.broker.region.virtual.CompositeQueue;
@@ -240,9 +242,18 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
      */
     @Test(timeout = 60 * 1000)
     public void testDynamicFlow() throws Exception {
+        testDynamicFlow(false);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testDynamicFlowForceDurable() throws Exception {
+        testDynamicFlow(true);
+    }
+
+    protected void testDynamicFlow(boolean forceDurable) throws Exception {
         Assume.assumeTrue(isUseVirtualDestSubsOnCreation);
 
-        doSetUp(true, null);
+        doSetUp(true, null, true, forceDurable);
 
         MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName);
 
@@ -262,6 +273,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
                 new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
 
         waitForConsumerCount(destinationStatistics, 1);
+        assertNCDurableSubsCount(localBroker, included, forceDurable ? 1 : 0);
         includedProducer.send(test);
 
         waitForDispatchFromLocalBroker(destinationStatistics, 1);
@@ -272,7 +284,6 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
         assertAdvisoryBrokerCounts(1,1,1);
     }
 
-
     /**
      * Test that dynamic flow works for virtual destinations when a second composite
      * topic is included that forwards to the same queue, but is excluded from
@@ -1006,7 +1017,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
         CompositeTopic compositeTopic = createCompositeTopic(testTopicName,
                 new ActiveMQQueue("include.test.bar.bridge"));
 
-        doSetUp(true, new VirtualDestination[] {compositeTopic}, false);
+        doSetUp(true, new VirtualDestination[] {compositeTopic}, false, false);
 
         MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName);
 
@@ -1034,7 +1045,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
         CompositeTopic compositeTopic = createCompositeTopic(testTopicName,
                 new ActiveMQQueue("include.test.bar.bridge"));
 
-        doSetUp(true, new VirtualDestination[] {compositeTopic}, false);
+        doSetUp(true, new VirtualDestination[] {compositeTopic}, false, false);
 
         MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName);
 
@@ -1291,16 +1302,16 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
 
     protected void doSetUp(boolean deleteAllMessages,
             VirtualDestination[] remoteVirtualDests) throws Exception {
-        doSetUp(deleteAllMessages, remoteVirtualDests, true);
+        doSetUp(deleteAllMessages, remoteVirtualDests, true, false);
     }
 
     protected void doSetUp(boolean deleteAllMessages,
-            VirtualDestination[] remoteVirtualDests, boolean startNetworkConnector) throws Exception {
+            VirtualDestination[] remoteVirtualDests, boolean startNetworkConnector, boolean forceDurable) throws Exception {
         remoteBroker = createRemoteBroker(isUseVirtualDestSubsOnCreation, remoteVirtualDests);
         remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
         remoteBroker.start();
         remoteBroker.waitUntilStarted();
-        localBroker = createLocalBroker(startNetworkConnector);
+        localBroker = createLocalBroker(startNetworkConnector, forceDurable);
         localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
         localBroker.start();
         localBroker.waitUntilStarted();
@@ -1324,13 +1335,16 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
 
 
     protected NetworkConnector connector;
-    protected BrokerService createLocalBroker(boolean startNetworkConnector) throws Exception {
+    protected BrokerService createLocalBroker(boolean startNetworkConnector, boolean forceDurable) throws Exception {
         BrokerService brokerService = new BrokerService();
         brokerService.setMonitorConnectionSplits(true);
         brokerService.setDataDirectoryFile(tempFolder.newFolder());
         brokerService.setBrokerName("localBroker");
 
-        connector = new DiscoveryNetworkConnector(new URI("static:(tcp://localhost:61617)"));
+        List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
+        URI remoteURI = transportConnectors.get(0).getConnectUri();
+        String uri = "static:(" + remoteURI + ")";
+        connector = new DiscoveryNetworkConnector(new URI(uri));
         connector.setName("networkConnector");
         connector.setDynamicOnly(false);
         connector.setDecreaseNetworkConsumerPriority(false);
@@ -1338,7 +1352,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
         connector.setDuplex(isDuplex);
         connector.setUseVirtualDestSubs(true);
         connector.setDynamicallyIncludedDestinations(Lists.newArrayList(new ActiveMQQueue(testQueueName),
-                new ActiveMQTopic(testTopicName), new ActiveMQTopic("VirtualTopic.>")));
+                new ActiveMQTopic(testTopicName + (forceDurable ? "?forceDurable=true" : "")), new ActiveMQTopic("VirtualTopic.>")));
         connector.setExcludedDestinations(Lists.newArrayList(new ActiveMQQueue("exclude.test.foo"),
                 new ActiveMQTopic("exclude.test.bar")));
 
@@ -1346,7 +1360,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
             brokerService.addNetworkConnector(connector);
         }
 
-        brokerService.addConnector("tcp://localhost:61616");
+        brokerService.addConnector("tcp://localhost:0");
 
         return brokerService;
     }
@@ -1374,7 +1388,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
         remoteAdvisoryBroker = (AdvisoryBroker)
                 brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
 
-        brokerService.addConnector("tcp://localhost:61617");
+        brokerService.addConnector("tcp://localhost:0");
 
         return brokerService;
     }