You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/10/20 17:47:49 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6472

Repository: activemq
Updated Branches:
  refs/heads/master 52ab6ba09 -> d206621a7


https://issues.apache.org/jira/browse/AMQ-6472

Durable sync over a network bridge will now also sync non-durable
subscriptions proplrly if the consumer belongs to a destination that is
configured to force network durable subscriptions.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d206621a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d206621a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d206621a

Branch: refs/heads/master
Commit: d206621a73e4731b00eff49518ea93ba9a4ee3c0
Parents: 52ab6ba
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Thu Oct 20 13:46:27 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Oct 20 13:47:38 2016 -0400

----------------------------------------------------------------------
 .../activemq/advisory/AdvisoryBroker.java       |   4 +
 .../activemq/broker/TransportConnection.java    |  30 +--
 .../network/DemandForwardingBridgeSupport.java  |  40 +---
 .../activemq/network/DurableConduitBridge.java  |  41 +----
 .../activemq/util/NetworkBridgeUtils.java       | 184 +++++++++++++++++++
 .../network/DurableSyncNetworkBridgeTest.java   | 138 +++++++++++++-
 6 files changed, 344 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d206621a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 5ac201e..556c149 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -893,6 +893,10 @@ public class AdvisoryBroker extends BrokerFilter {
         return destinations;
     }
 
+    public ConcurrentMap<ConsumerInfo, VirtualDestination> getVirtualDestinationConsumers() {
+        return virtualDestinationConsumers;
+    }
+
     private class VirtualConsumerPair {
         private final VirtualDestination virtualDestination;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/d206621a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index a05ba8c..a32d4f6 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -20,14 +20,15 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.net.SocketException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
@@ -39,10 +40,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.transaction.xa.XAResource;
 
+import org.apache.activemq.advisory.AdvisoryBroker;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.region.ConnectionStatistics;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerInfo;
@@ -103,7 +106,7 @@ import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.MarshallingSupport;
-import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
+import org.apache.activemq.util.NetworkBridgeUtils;
 import org.apache.activemq.util.SubscriptionKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1378,23 +1381,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
         this.pendingStop = pendingStop;
     }
 
-    public static BrokerSubscriptionInfo getBrokerSubscriptionInfo(final BrokerService brokerService) {
-        RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
-        TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
-        List<ConsumerInfo> subscriptionInfos = new ArrayList<>();
-        for (SubscriptionKey key : topicRegion.getDurableSubscriptions().keySet()) {
-            DurableTopicSubscription sub = topicRegion.getDurableSubscriptions().get(key);
-            if (sub != null) {
-                ConsumerInfo ci = sub.getConsumerInfo().copy();
-                ci.setClientId(key.getClientId());
-                subscriptionInfos.add(ci);
-            }
-        }
-        BrokerSubscriptionInfo bsi = new BrokerSubscriptionInfo(brokerService.getBrokerName());
-        bsi.setSubscriptionInfos(subscriptionInfos.toArray(new ConsumerInfo[0]));
-        return bsi;
-    }
-
     private NetworkBridgeConfiguration getNetworkConfiguration(final BrokerInfo info) throws IOException {
         Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
         Map<String, String> props = createMap(properties);
@@ -1412,7 +1398,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
                 NetworkBridgeConfiguration config = getNetworkConfiguration(info);
                 if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
                     LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
-                    dispatchSync(getBrokerSubscriptionInfo(this.broker.getBrokerService()));
+                    dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
                 }
             } catch (Exception e) {
                 LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e);
@@ -1425,9 +1411,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
                 NetworkBridgeConfiguration config = getNetworkConfiguration(info);
                 config.setBrokerName(broker.getBrokerName());
 
-                if (config.isSyncDurableSubs() && protocolVersion.get() >= 12) {
+                if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
                     LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
-                    dispatchSync(getBrokerSubscriptionInfo(this.broker.getBrokerService()));
+                    dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
                 }
 
                 // check for existing duplex connection hanging about

http://git-wip-us.apache.org/repos/asf/activemq/blob/d206621a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index b58259d..a8c45b0 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -99,6 +99,7 @@ import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.util.NetworkBridgeUtils;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
 import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
@@ -575,7 +576,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                     remoteBroker.oneway(brokerInfo);
                     if (configuration.isSyncDurableSubs() &&
                             remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
-                        remoteBroker.oneway(TransportConnection.getBrokerSubscriptionInfo(brokerService));
+                        remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService,
+                                configuration));
                     }
                 }
                 if (remoteConnectionInfo != null) {
@@ -656,8 +658,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                         if (started.get()) {
                             if (subInfo.getSubscriptionInfos() != null) {
                                 for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
-                                    if(!info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) &&
-                                            matchesDynamicallyIncludedDestinations(info.getDestination())) {
+                                    //re-add any process any non-NC consumers that match the
+                                    //dynamicallyIncludedDestinations list
+                                    if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
+                                            NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
                                         serviceRemoteConsumerAdvisory(info);
                                     }
                                 }
@@ -666,7 +670,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                             //After re-added, clean up any empty durables
                             for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
                                 DemandSubscription ds = i.next();
-                                if (matchesDynamicallyIncludedDestinations(ds.getLocalInfo().getDestination())) {
+                                if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) {
                                     cleanupDurableSub(ds, i);
                                 }
                             }
@@ -907,7 +911,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
             Iterator<DemandSubscription> i) throws IOException {
         if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()
                 && ds.getForcedDurableConsumersSize() == 0) {
-
             // deactivate subscriber
             RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
             localBroker.oneway(removeInfo);
@@ -1245,33 +1248,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         return true;
     }
 
-    private boolean matchesDynamicallyIncludedDestinations(ActiveMQDestination destination) {
-        ActiveMQDestination[] dests = dynamicallyIncludedDestinations;
-        if (dests != null && dests.length > 0) {
-            for (ActiveMQDestination dest : dests) {
-                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
-                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
-                    return true;
-                }
-            }
-        }
-
-        return false;
-    }
-
-    protected ActiveMQDestination findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination destination) {
-        if (dests != null && dests.length > 0) {
-            for (ActiveMQDestination dest : dests) {
-                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
-                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
-                    return dest;
-                }
-            }
-        }
-
-        return null;
-    }
-
     /**
      * Subscriptions for these destinations are always created
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/d206621a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index 969c386..50c9855 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -17,9 +17,7 @@
 package org.apache.activemq.network;
 
 import java.io.IOException;
-import java.util.Map;
 
-import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicRegion;
@@ -29,7 +27,7 @@ import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.transport.Transport;
-import org.apache.activemq.util.TypeConversionSupport;
+import org.apache.activemq.util.NetworkBridgeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -120,7 +118,8 @@ public class DurableConduitBridge extends ConduitBridge {
 
     @Override
     protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
-        boolean isForcedDurable = isForcedDurable(info);
+        boolean isForcedDurable = NetworkBridgeUtils.isForcedDurable(info,
+                dynamicallyIncludedDestinations, staticallyIncludedDestinations);
 
         if (addToAlreadyInterestedConsumers(info, isForcedDurable)) {
             return null; // don't want this subscription added
@@ -146,40 +145,6 @@ public class DurableConduitBridge extends ConduitBridge {
         return demandSubscription;
     }
 
-
-    private boolean isForcedDurable(ConsumerInfo info) {
-        if (info.isDurable()) {
-            return false;
-        }
-
-        ActiveMQDestination destination = info.getDestination();
-        if (AdvisorySupport.isAdvisoryTopic(destination) || destination.isTemporary() ||
-                destination.isQueue()) {
-            return false;
-        }
-
-        ActiveMQDestination matching = findMatchingDestination(dynamicallyIncludedDestinations, destination);
-        if (matching != null) {
-            return isDestForcedDurable(matching);
-        }
-        matching = findMatchingDestination(staticallyIncludedDestinations, destination);
-        if (matching != null) {
-            return isDestForcedDurable(matching);
-        }
-        return false;
-    }
-
-    private boolean isDestForcedDurable(ActiveMQDestination destination) {
-        final Map<String, String> options = destination.getOptions();
-
-        boolean isForceDurable = false;
-        if (options != null) {
-            isForceDurable = (boolean) TypeConversionSupport.convert(options.get("forceDurable"), boolean.class);
-        }
-
-        return isForceDurable;
-    }
-
     protected String getSubscriberName(ActiveMQDestination dest) {
         String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName();
         return subscriberName;

http://git-wip-us.apache.org/repos/asf/activemq/blob/d206621a/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java b/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java
new file mode 100644
index 0000000..700baf6
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq.advisory.AdvisoryBroker;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.TopicRegion;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.BrokerSubscriptionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.filter.DestinationFilter;
+import org.apache.activemq.network.NetworkBridgeConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetworkBridgeUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeUtils.class);
+
+    /**
+     * Generate the BrokerSubscriptionInfo which is used to tell the broker on the other
+     * side of the network bridge which NC durable subscriptions are still needed for demand.
+     * @param brokerService
+     * @param config
+     * @return
+     */
+    public static BrokerSubscriptionInfo getBrokerSubscriptionInfo(final BrokerService brokerService,
+            final NetworkBridgeConfiguration config) {
+
+        RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
+        TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
+        Set<ConsumerInfo> subscriptionInfos = new HashSet<>();
+
+        //Add all durable subscriptions to the set that match the network config
+        //which currently is just the dynamicallyIncludedDestinations list
+        for (SubscriptionKey key : topicRegion.getDurableSubscriptions().keySet()) {
+            DurableTopicSubscription sub = topicRegion.getDurableSubscriptions().get(key);
+            if (sub != null && NetworkBridgeUtils.matchesNetworkConfig(config, sub.getConsumerInfo().getDestination())) {
+                ConsumerInfo ci = sub.getConsumerInfo().copy();
+                ci.setClientId(key.getClientId());
+                subscriptionInfos.add(ci);
+            }
+        }
+
+        //We also need to iterate over all normal subscriptions and check if they are part of
+        //any dynamicallyIncludedDestination that is configured with forceDurable to be true
+        //over the network bridge.  If forceDurable is true then we want to add the consumer to the set
+        for (Subscription sub : topicRegion.getSubscriptions().values()) {
+            if (sub != null && NetworkBridgeUtils.isForcedDurable(sub.getConsumerInfo(),
+                    config.getDynamicallyIncludedDestinations())) {
+                subscriptionInfos.add(sub.getConsumerInfo().copy());
+            }
+        }
+
+        try {
+            //Lastly, if isUseVirtualDestSubs is configured on this broker (to fire advisories) and
+            //configured on the network connector (to listen to advisories) then also add any virtual
+            //dest subscription to the set if forceDurable is true for its destination
+            AdvisoryBroker ab = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
+            if (ab != null && brokerService.isUseVirtualDestSubs() && config.isUseVirtualDestSubs()) {
+                for (ConsumerInfo info : ab.getVirtualDestinationConsumers().keySet()) {
+                    if (NetworkBridgeUtils.isForcedDurable(info, config.getDynamicallyIncludedDestinations())) {
+                        subscriptionInfos.add(info.copy());
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Error processing virtualDestinationSubs for BrokerSubscriptionInfo");
+            LOG.debug("Error processing virtualDestinationSubs for BrokerSubscriptionInfo", e);
+        }
+        BrokerSubscriptionInfo bsi = new BrokerSubscriptionInfo(brokerService.getBrokerName());
+        bsi.setSubscriptionInfos(subscriptionInfos.toArray(new ConsumerInfo[0]));
+        return bsi;
+    }
+
+    public static boolean isForcedDurable(final ConsumerInfo info,
+            final List<ActiveMQDestination> dynamicallyIncludedDestinations) {
+        return dynamicallyIncludedDestinations != null
+                ? isForcedDurable(info,
+                        dynamicallyIncludedDestinations.toArray(new ActiveMQDestination[0]), null) : false;
+    }
+
+    public static boolean isForcedDurable(final ConsumerInfo info,
+            final ActiveMQDestination[] dynamicallyIncludedDestinations,
+            final ActiveMQDestination[] staticallyIncludedDestinations) {
+
+        if (info.isDurable() || info.getDestination().isQueue()) {
+            return false;
+        }
+
+        ActiveMQDestination destination = info.getDestination();
+        if (AdvisorySupport.isAdvisoryTopic(destination) || destination.isTemporary() ||
+                destination.isQueue()) {
+            return false;
+        }
+
+        ActiveMQDestination matching = findMatchingDestination(dynamicallyIncludedDestinations, destination);
+        if (matching != null) {
+            return isDestForcedDurable(matching);
+        }
+        matching = findMatchingDestination(staticallyIncludedDestinations, destination);
+        if (matching != null) {
+            return isDestForcedDurable(matching);
+        }
+        return false;
+    }
+
+    public static boolean matchesNetworkConfig(final NetworkBridgeConfiguration config,
+            ActiveMQDestination destination) {
+        List<ActiveMQDestination> includedDests = config.getDynamicallyIncludedDestinations();
+        if (includedDests != null && includedDests.size() > 0) {
+            for (ActiveMQDestination dest : includedDests) {
+                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
+                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
+    public static boolean matchesDestinations(ActiveMQDestination[] dests, final ActiveMQDestination destination) {
+        if (dests != null && dests.length > 0) {
+            for (ActiveMQDestination dest : dests) {
+                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
+                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
+    public static ActiveMQDestination findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination destination) {
+        if (dests != null && dests.length > 0) {
+            for (ActiveMQDestination dest : dests) {
+                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
+                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
+                    return dest;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    public static boolean isDestForcedDurable(final ActiveMQDestination destination) {
+        boolean isForceDurable = false;
+        if (destination != null) {
+            final Map<String, String> options = destination.getOptions();
+
+            if (options != null) {
+                isForceDurable = (boolean) TypeConversionSupport.convert(options.get("forceDurable"), boolean.class);
+            }
+        }
+
+        return isForceDurable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/d206621a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index 62b3dec..4a705f3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.network;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
 import java.net.URI;
 import java.util.Arrays;
@@ -23,17 +26,25 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.advisory.AdvisoryBroker;
+import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.virtual.CompositeTopic;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker;
+import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
 import org.apache.activemq.util.Wait;
@@ -57,11 +68,13 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
 
     protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class);
 
+    protected JavaRuntimeConfigurationBroker remoteRuntimeBroker;
     protected String staticIncludeTopics = "include.static.test";
     protected String includedTopics = "include.test.>";
     protected String testTopicName2 = "include.test.bar2";
     private boolean dynamicOnly = false;
     private boolean forceDurable = false;
+    private boolean useVirtualDestSubs = false;
     private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
     public static enum FLOW {FORWARD, REVERSE};
 
@@ -107,6 +120,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         staticIncludeTopics = "include.static.test";
         dynamicOnly = false;
         forceDurable = false;
+        useVirtualDestSubs = false;
         remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
         doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder());
     }
@@ -521,6 +535,116 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
 
     }
 
+    @Test(timeout = 60 * 1000)
+    public void testVirtualDestSubForceDurableSync() throws Exception {
+        Assume.assumeTrue(flow == FLOW.FORWARD);
+        forceDurable = true;
+        useVirtualDestSubs = true;
+        this.restartBrokers(true);
+
+        //configure a virtual destination that forwards messages from topic testQueueName
+        CompositeTopic compositeTopic = createCompositeTopic(testTopicName,
+                new ActiveMQQueue("include.test.bar.bridge"));
+        remoteRuntimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true);
+
+        MessageProducer includedProducer = localSession.createProducer(included);
+        Message test = localSession.createTextMessage("test");
+
+        final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics();
+        final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination(
+                new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
+
+        //Make sure that the NC durable is created because of the compositeTopic
+        waitForConsumerCount(destinationStatistics, 1);
+        assertNCDurableSubsCount(localBroker, included, 1);
+
+        //Send message and make sure it is dispatched across the bridge
+        includedProducer.send(test);
+        waitForDispatchFromLocalBroker(destinationStatistics, 1);
+        assertLocalBrokerStatistics(destinationStatistics, 1);
+        assertEquals("remote dest messages", 1, remoteDestStatistics.getMessages().getCount());
+
+        //Stop the remote broker so the bridge stops and then send 500 messages so
+        //the messages build up on the NC durable
+        this.stopRemoteBroker();
+        for (int i = 0; i < 500; i++) {
+            includedProducer.send(test);
+        }
+        this.stopLocalBroker();
+
+        //Restart the brokers
+        this.restartRemoteBroker();
+        remoteRuntimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true);
+        this.restartLocalBroker(true);
+
+        //We now need to verify that 501 messages made it to the queue on the remote side
+        //which means that the NC durable was not deleted and recreated during the sync
+        final DestinationStatistics remoteDestStatistics2 = remoteBroker.getDestination(
+                new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
+
+        assertTrue(Wait.waitFor(new Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return remoteDestStatistics2.getMessages().getCount() == 501;
+            }
+        }));
+
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testForceDurableTopicSubSync() throws Exception {
+        Assume.assumeTrue(flow == FLOW.FORWARD);
+        forceDurable = true;
+        this.restartBrokers(true);
+
+        //configure a virtual destination that forwards messages from topic testQueueName
+        remoteSession.createConsumer(included);
+
+        MessageProducer includedProducer = localSession.createProducer(included);
+        Message test = localSession.createTextMessage("test");
+
+        final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics();
+
+        //Make sure that the NC durable is created because of the compositeTopic
+        waitForConsumerCount(destinationStatistics, 1);
+        assertNCDurableSubsCount(localBroker, included, 1);
+
+        //Send message and make sure it is dispatched across the bridge
+        includedProducer.send(test);
+        waitForDispatchFromLocalBroker(destinationStatistics, 1);
+        assertLocalBrokerStatistics(destinationStatistics, 1);
+
+        //Stop the network connector and send messages to the local broker so they build
+        //up on the durable
+        this.localBroker.getNetworkConnectorByName("networkConnector").stop();
+
+        for (int i = 0; i < 500; i++) {
+            includedProducer.send(test);
+        }
+
+        //restart the local broker and bridge
+        this.stopLocalBroker();
+        this.restartLocalBroker(true);
+
+        //We now need to verify that the 500 messages on the NC durable are dispatched
+        //on bridge sync which shows that the durable wasn't destroyed/recreated
+        final DestinationStatistics destinationStatistics2 =
+                localBroker.getDestination(included).getDestinationStatistics();
+        waitForDispatchFromLocalBroker(destinationStatistics2, 500);
+        assertLocalBrokerStatistics(destinationStatistics2, 500);
+
+    }
+
+    protected CompositeTopic createCompositeTopic(String name, ActiveMQDestination...forwardTo) {
+        CompositeTopic compositeTopic = new CompositeTopic();
+        compositeTopic.setName(name);
+        compositeTopic.setForwardOnly(true);
+        compositeTopic.setForwardTo( Lists.newArrayList(forwardTo));
+
+        return compositeTopic;
+    }
+
     protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception {
         if (broker.getBrokerName().equals("localBroker")) {
             restartLocalBroker(startNetworkConnector);
@@ -607,12 +731,15 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         if (flow.equals(FLOW.FORWARD)) {
             broker2 = remoteBroker;
             session2 = remoteSession;
+            remoteRuntimeBroker = (JavaRuntimeConfigurationBroker)
+                    remoteBroker.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
         } else {
             broker1 = remoteBroker;
             session1 = remoteSession;
         }
     }
 
+
     protected BrokerService createLocalBroker(File dataDir, boolean startNetworkConnector) throws Exception {
         BrokerService brokerService = new BrokerService();
         brokerService.setMonitorConnectionSplits(true);
@@ -622,6 +749,8 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         adapter.setDirectory(dataDir);
         adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name());
         brokerService.setPersistenceAdapter(adapter);
+        brokerService.setUseVirtualDestSubs(useVirtualDestSubs);
+        brokerService.setUseVirtualDestSubsOnCreation(useVirtualDestSubs);
 
         if (startNetworkConnector) {
             brokerService.addNetworkConnector(configureLocalNetworkConnector());
@@ -645,10 +774,11 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         connector.setDuplex(true);
         connector.setStaticBridge(false);
         connector.setSyncDurableSubs(true);
+        connector.setUseVirtualDestSubs(useVirtualDestSubs);
         connector.setStaticallyIncludedDestinations(
                 Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(staticIncludeTopics + "?forceDurable=" + forceDurable)));
         connector.setDynamicallyIncludedDestinations(
-                Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(includedTopics)));
+                Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(includedTopics + "?forceDurable=" + forceDurable)));
         connector.setExcludedDestinations(
                 Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(excludeTopicName)));
         return connector;
@@ -665,6 +795,12 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         adapter.setDirectory(dataDir);
         adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name());
         brokerService.setPersistenceAdapter(adapter);
+        brokerService.setUseVirtualDestSubs(useVirtualDestSubs);
+        brokerService.setUseVirtualDestSubsOnCreation(useVirtualDestSubs);
+
+        if (useVirtualDestSubs) {
+            brokerService.setPlugins(new BrokerPlugin[] {new JavaRuntimeConfigurationPlugin()});
+        }
 
         remoteAdvisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);