You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2017/11/09 16:23:14 UTC

activemq git commit: AMQ-6858 - Fix several durable subscription bridge propagation issues

Repository: activemq
Updated Branches:
  refs/heads/master 7dad09a9c -> 96ce14b27


AMQ-6858 - Fix several durable subscription bridge propagation issues

Durable network proxy subs will now be properly created across multiple
bridges when 3 or more brokers are used.  Demand will be properly synced
and removed.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/96ce14b2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/96ce14b2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/96ce14b2

Branch: refs/heads/master
Commit: 96ce14b278fef9e5f428f1c3c07ce5c09fd8f9a8
Parents: 7dad09a
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Mon Nov 6 08:42:03 2017 -0500
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Nov 9 11:21:18 2017 -0500

----------------------------------------------------------------------
 .../apache/activemq/network/ConduitBridge.java  |  16 +-
 .../network/DemandForwardingBridgeSupport.java  |  49 +++-
 .../activemq/network/DurableConduitBridge.java  |  19 +-
 .../network/NetworkBridgeConfiguration.java     |  13 +
 .../DurableThreeBrokerNetworkBridgeTest.java    | 241 +++++++++++++++++++
 .../VerifyNetworkConsumersDisconnectTest.java   |  68 +++++-
 6 files changed, 384 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
index 3c0b85b..6ced896 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
@@ -56,12 +56,16 @@ public class ConduitBridge extends DemandForwardingBridge {
     }
 
     protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info, boolean isForcedDurable) {
-        // search through existing subscriptions and see if we have a match
-        if (info.isNetworkSubscription()) {
+        //If a network subscription and a queue check if isConduitNetworkQueueSubscriptions is true
+        //If true then we want to try and conduit
+        //For topics we always want to conduit regardless of network subscription or not
+        if (info.isNetworkSubscription() && info.getDestination().isQueue() &&
+                !configuration.isConduitNetworkQueueSubscriptions()) {
             return false;
         }
         boolean matched = false;
 
+        // search through existing subscriptions and see if we have a match
         for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
             DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination());
             if (canConduit(ds) && filter.matches(info.getDestination())) {
@@ -86,9 +90,13 @@ public class ConduitBridge extends DemandForwardingBridge {
     }
 
     // we want to conduit statically included consumers which are local networkSubs
-    // but we don't want to conduit remote network subs i.e. (proxy proxy) consumers
+    // but we don't want to conduit remote network queue subs i.e. (proxy proxy) consumers
+    // unless isConduitNetworkQueueSubscriptions is true
+    // We always want to conduit topic subscriptions
     private boolean canConduit(DemandSubscription ds) {
-        return ds.isStaticallyIncluded() || !ds.getRemoteInfo().isNetworkSubscription();
+        return ds.isStaticallyIncluded() || ds.getRemoteInfo().getDestination().isTopic() ||
+                !ds.getRemoteInfo().isNetworkSubscription() ||
+                (ds.getRemoteInfo().getDestination().isQueue() && configuration.isConduitNetworkQueueSubscriptions());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 879ab39..efdfa5a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -661,6 +661,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         }
     }
 
+    /**
+     * Checks whether or not this consumer is a direct bridge network subscription
+     * @param info
+     * @return
+     */
+    protected boolean isBridgeNS(ConsumerInfo info) {
+        return (info.getSubscriptionName() != null && info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
+                (info.getClientId() == null || info.getClientId().startsWith(configuration.getName()));
+    }
+
     protected void serviceRemoteCommand(Command command) {
         if (!disposed.get()) {
             try {
@@ -694,7 +704,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                                                 for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
                                                     //re-add any process any non-NC consumers that match the
                                                     //dynamicallyIncludedDestinations list
-                                                    if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
+                                                    //Also re-add network consumers that are not part of this direct
+                                                    //bridge (proxy of proxy bridges)
+                                                    if((info.getSubscriptionName() == null || !isBridgeNS(info)) &&
                                                             NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
                                                         serviceRemoteConsumerAdvisory(info);
                                                     }
@@ -986,7 +998,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
             localBroker.oneway(sending);
 
             //remove subscriber from map
-            i.remove();
+            if (i != null) {
+                i.remove();
+            }
         }
     }
 
@@ -1072,7 +1086,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
             subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
             subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
 
-            // continue removal in separate thread to free up this thread for outstanding responses
+            // continue removal in separate thread to free up tshis thread for outstanding responses
             // Serialize with removeDestination operations so that removeSubs are serialized with
             // removeDestinations such that all removeSub advisories are generated
             serialExecutor.execute(new Runnable() {
@@ -1080,7 +1094,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 public void run() {
                     sub.waitForCompletion();
                     try {
-                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
+                        //If removing a network durable subscription that still has durable remote subs
+                        //make sure we cleanup the durable subscription properly - necessary when using
+                        //durable subscriptions and 3 or more brokers
+                        if (configuration.isConduitSubscriptions() &&
+                                sub.getLocalInfo().getSubscriptionName() != null &&
+                                sub.getLocalInfo().getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) &&
+                                sub.getDurableRemoteSubs().size() > 0) {
+                            sub.getDurableRemoteSubs().clear();
+                            cleanupDurableSub(sub, null);
+                        } else {
+                            localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
+                        }
                     } catch (IOException e) {
                         LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
                     }
@@ -1315,13 +1340,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
             for (ActiveMQDestination dest : dests) {
                 if (isPermissableDestination(dest)) {
                     DemandSubscription sub = createDemandSubscription(dest, null);
-                    sub.setStaticallyIncluded(true);
-                    try {
-                        addSubscription(sub);
-                    } catch (IOException e) {
-                        LOG.error("Failed to add static destination {}", dest, e);
+                    if (sub != null) {
+                        sub.setStaticallyIncluded(true);
+                        try {
+                            addSubscription(sub);
+                        } catch (IOException e) {
+                            LOG.error("Failed to add static destination {}", dest, e);
+                        }
+                        LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
+                    } else {
+                        LOG.info("{}, static destination excluded: {}, demand already exists", configuration.getBrokerName(), dest);
                     }
-                    LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
                 } else {
                     LOG.info("{}, static destination excluded: {}", configuration.getBrokerName(), dest);
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index fb2b6c9..42f30a4 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -75,12 +75,15 @@ public class DurableConduitBridge extends ConduitBridge {
                             String candidateSubName = getSubscriberName(dest);
                             for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
                                 String subName = subscription.getConsumerInfo().getSubscriptionName();
-                                if (subName != null && subName.equals(candidateSubName)) {
+                                String clientId = subscription.getContext().getClientId();
+                                if (subName != null && subName.equals(candidateSubName) && clientId.startsWith(configuration.getName())) {
                                     DemandSubscription sub = createDemandSubscription(dest, subName);
-                                    sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
-                                    sub.setStaticallyIncluded(true);
-                                    addSubscription(sub);
-                                    break;
+                                    if (sub != null) {
+                                        sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
+                                        sub.setStaticallyIncluded(true);
+                                        addSubscription(sub);
+                                        break;
+                                    }
                                 }
                             }
                         }
@@ -139,8 +142,12 @@ public class DurableConduitBridge extends ConduitBridge {
             info.setSubscriptionName(getSubscriberName(info.getDestination()));
             // and override the consumerId with something unique so that it won't
             // be removed if the durable subscriber (at the other end) goes away
-            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
+            //Only do this for direct bridge consumers - proxy network consumers we don't
+            //want to replace the consumerId or cleanup won't happen properly
+            if (info.getBrokerPath().length == 1 || (info.getBrokerPath().length > 1 && info.getBrokerPath()[0] == remoteBrokerPath[0])) {
+                info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
                                consumerIdGenerator.getNextSequenceId()));
+            }
         }
         info.setSelector(null);
         DemandSubscription demandSubscription = doCreateDemandSubscription(info);

http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index 9e596d4..b2ca78a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -29,6 +29,11 @@ import org.apache.activemq.command.ConsumerInfo;
 public class NetworkBridgeConfiguration {
 
     private boolean conduitSubscriptions = true;
+    /**
+     * Whether or not network subscriptions on queues are eligible to be conduit
+     * Default is false
+     */
+    private boolean conduitNetworkQueueSubscriptions;
     private boolean useVirtualDestSubs;
     private boolean dynamicOnly;
     private boolean syncDurableSubs;
@@ -85,6 +90,14 @@ public class NetworkBridgeConfiguration {
         this.conduitSubscriptions = conduitSubscriptions;
     }
 
+    public boolean isConduitNetworkQueueSubscriptions() {
+        return conduitNetworkQueueSubscriptions;
+    }
+
+    public void setConduitNetworkQueueSubscriptions(boolean conduitNetworkQueueSubscriptions) {
+        this.conduitNetworkQueueSubscriptions = conduitNetworkQueueSubscriptions;
+    }
+
     /**
      * @return the dynamicOnly
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
new file mode 100644
index 0000000..ff09a1c
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFilter;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.SubscriptionKey;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+
+import com.google.common.collect.Lists;
+
+import junit.framework.Test;
+
+/**
+ * Test to make sure durable subscriptions propagate properly throughout network bridges
+ * and that conduit subscriptions work properly
+ */
+public class DurableThreeBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport {
+
+    @Override
+    protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
+        NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName);
+        connector.setDynamicallyIncludedDestinations(
+                Lists.<ActiveMQDestination> newArrayList(new ActiveMQTopic("TEST.FOO?forceDurable=true")));
+        connector.setDuplex(true);
+        connector.setDecreaseNetworkConsumerPriority(false);
+        connector.setConduitSubscriptions(true);
+        connector.setSyncDurableSubs(true);
+        connector.setNetworkTTL(-1);
+        return connector;
+    }
+
+    /**
+     * BrokerA -> BrokerB -> BrokerC
+     */
+    public void testDurablePropagation() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        sendMessages("BrokerC", dest, 1);
+        assertNotNull(clientA.receive(1000));
+        assertNotNull(clientB.receive(1000));
+
+        //bring online a consumer on the other side
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
+        //there will be 2 network durables, 1 for each direction of the bridge
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientB.close();
+        clientC.close();
+        ses.unsubscribe("subA");
+        ses.unsubscribe("subB");
+        ses2.unsubscribe("subC");
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+
+    }
+
+    public void testForceDurablePropagation() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        MessageConsumer clientA = ses.createConsumer(dest);
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        sendMessages("BrokerC", dest, 1);
+        assertNotNull(clientA.receive(1000));
+
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientC = ses2.createConsumer(dest);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientC.close();
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+    }
+
+    public void testDurablePropagationSync() throws Exception {
+        // Setup broker networks
+        NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB");
+        NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerC");
+
+        startAllBrokers();
+
+        nc1.stop();
+        nc2.stop();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
+        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+
+        nc1.start();
+        nc2.start();
+
+        //there will be 2 network durables, 1 for each direction of the bridge
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientB.close();
+        clientC.close();
+    }
+
+
+    protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest,
+            final int count) throws Exception {
+        assertTrue(Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return count == getNCDurableSubs(brokerService, dest).size();
+            }
+        }, 10000, 500));
+    }
+
+    protected List<DurableTopicSubscription> getNCDurableSubs(final BrokerService brokerService,
+            final ActiveMQTopic dest) throws Exception {
+        List<DurableTopicSubscription> subs = new ArrayList<>();
+        Destination d = brokerService.getDestination(dest);
+        org.apache.activemq.broker.region.Topic destination = null;
+        if (d instanceof DestinationFilter) {
+            destination = ((DestinationFilter) d).getAdaptor(org.apache.activemq.broker.region.Topic.class);
+        } else {
+            destination = (org.apache.activemq.broker.region.Topic) d;
+        }
+
+        for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
+            if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) {
+                DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
+                if (sub != null) {
+                    subs.add(sub);
+                }
+            }
+        }
+
+        return subs;
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+        String options = new String("?persistent=false&useJmx=false");
+        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
+        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
+        createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
+    }
+
+    @Override
+    protected void configureBroker(BrokerService broker) {
+        broker.setBrokerId(broker.getBrokerName());
+    }
+
+    protected Session createSession(String broker) throws Exception {
+        Connection con = createConnection(broker);
+        con.start();
+        return con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public static Test suite() {
+        return suite(DurableThreeBrokerNetworkBridgeTest.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
index 9eeb28c..d04e1f5 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
@@ -61,14 +61,19 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
                 unhandledExceptions.isEmpty());
     }
 
-    public NetworkConnector bridge(String from, String to) throws Exception {
+    public NetworkConnector bridge(String from, String to, boolean conduitNetworkQueueSubscriptions) throws Exception {
         NetworkConnector networkConnector = bridgeBrokers(from, to, true, NETWORK_TTL, CONDUIT);
         networkConnector.setSuppressDuplicateQueueSubscriptions(true);
         networkConnector.setDecreaseNetworkConsumerPriority(true);
         networkConnector.setDuplex(DUPLEX);
+        networkConnector.setConduitNetworkQueueSubscriptions(conduitNetworkQueueSubscriptions);
         return networkConnector;
     }
 
+    public NetworkConnector bridge(String from, String to) throws Exception {
+        return bridge(from, to, false);
+    }
+
     /*why conduit proxy proxy consumers gets us in a knot w.r.t removal
     DC-7 for CA-9, add DB-15, remove CA-9, add CB-8
     CB-8 add DC-7
@@ -137,6 +142,63 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
 
     }
 
+    public void testConsumerOnEachBrokerNetworkQueueConduitSubs() throws Exception {
+        bridge("Broker0", "Broker1", true);
+        if (!DUPLEX) bridge("Broker1", "Broker0", true);
+
+        bridge("Broker1", "Broker2", true);
+        if (!DUPLEX) bridge("Broker2", "Broker1", true);
+
+        startAllBrokers();
+        waitForBridgeFormation(brokers.get("Broker0").broker, 1, 0);
+        waitForBridgeFormation(brokers.get("Broker2").broker, 1, 0);
+        waitForBridgeFormation(brokers.get("Broker1").broker, 1, 0);
+        waitForBridgeFormation(brokers.get("Broker1").broker, 1, 1);
+
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Setup consumers
+        for (int i = 0; i < BROKER_COUNT; i++) {
+            consumerMap.put("Consumer:" + i + ":0", createConsumer("Broker" + i, dest));
+        }
+
+        //Conduit network queue conduit subs is true so should only be 2 subs
+        assertExactConsumersConnect("Broker0", 2, 1, TIMEOUT);
+        assertExactConsumersConnect("Broker2", 2, 1, TIMEOUT);
+        // still should be 3 subs for the middle broker, 1 for each direction
+        assertExactConsumersConnect("Broker1", 3, 1, TIMEOUT);
+
+        assertNoUnhandledExceptions();
+
+        LOG.info("Complete the mesh - 0->2");
+
+        // shorter route
+        NetworkConnector nc = bridge("Broker0", "Broker2");
+        nc.setBrokerName("Broker0");
+        nc.start();
+
+
+        if (!DUPLEX) {
+            LOG.info("... complete the mesh - 2->0");
+            nc = bridge("Broker2", "Broker0");
+            nc.setBrokerName("Broker2");
+            nc.start();
+        }
+
+        // reverse order close
+        consumerMap.get("Consumer:" + 2 + ":0").close();
+        TimeUnit.SECONDS.sleep(1);
+        consumerMap.get("Consumer:" + 1 + ":0").close();
+        TimeUnit.SECONDS.sleep(1);
+        consumerMap.get("Consumer:" + 0 + ":0").close();
+
+        LOG.info("Check for no consumers..");
+        for (int i = 0; i < BROKER_COUNT; i++) {
+            assertExactConsumersConnect("Broker" + i, 0, 0, TIMEOUT);
+        }
+
+    }
+
     public void testXConsumerOnEachBroker() throws Exception {
         bridge("Broker0", "Broker1");
         if (!DUPLEX) bridge("Broker1", "Broker0");
@@ -232,13 +294,14 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
         }, timeout));
     }
 
+    @Override
     public void setUp() throws Exception {
         super.setAutoFail(true);
         super.setUp();
 
         unhandledExceptions.clear();
         Thread.setDefaultUncaughtExceptionHandler(this);
-        
+
         // Setup n brokers
         for (int i = 0; i < BROKER_COUNT; i++) {
             createBroker(new URI("broker:(tcp://localhost:6161" + i + ")/Broker" + i + "?persistent=false&useJmx=true&brokerId=Broker" + i));
@@ -256,6 +319,7 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
         brokerService.setDestinationPolicy(policyMap);
     }
 
+    @Override
     public void uncaughtException(Thread t, Throwable e) {
         synchronized(unhandledExceptions) {
             unhandledExceptions.put(t, e);