You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2017/11/16 12:51:22 UTC

[1/6] activemq git commit: AMQ-6858 - Fix several durable subscription bridge propagation issues

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.15.x 39cbdf706 -> 1cfc9ff9a


AMQ-6858 - Fix several durable subscription bridge propagation issues

Durable network proxy subs will now be properly created across multiple
bridges when 3 or more brokers are used.  Demand will be properly synced
and removed.

(cherry picked from commit 96ce14b278fef9e5f428f1c3c07ce5c09fd8f9a8)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d5a987b6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d5a987b6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d5a987b6

Branch: refs/heads/activemq-5.15.x
Commit: d5a987b6a55233f9a83e108521746cf55d3eb32f
Parents: 39cbdf7
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Mon Nov 6 08:42:03 2017 -0500
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Nov 16 07:50:12 2017 -0500

----------------------------------------------------------------------
 .../apache/activemq/network/ConduitBridge.java  |  16 +-
 .../network/DemandForwardingBridgeSupport.java  |  49 +++-
 .../activemq/network/DurableConduitBridge.java  |  19 +-
 .../network/NetworkBridgeConfiguration.java     |  13 +
 .../DurableThreeBrokerNetworkBridgeTest.java    | 241 +++++++++++++++++++
 .../VerifyNetworkConsumersDisconnectTest.java   |  68 +++++-
 6 files changed, 384 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d5a987b6/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
index 3c0b85b..6ced896 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
@@ -56,12 +56,16 @@ public class ConduitBridge extends DemandForwardingBridge {
     }
 
     protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info, boolean isForcedDurable) {
-        // search through existing subscriptions and see if we have a match
-        if (info.isNetworkSubscription()) {
+        //If a network subscription and a queue check if isConduitNetworkQueueSubscriptions is true
+        //If true then we want to try and conduit
+        //For topics we always want to conduit regardless of network subscription or not
+        if (info.isNetworkSubscription() && info.getDestination().isQueue() &&
+                !configuration.isConduitNetworkQueueSubscriptions()) {
             return false;
         }
         boolean matched = false;
 
+        // search through existing subscriptions and see if we have a match
         for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
             DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination());
             if (canConduit(ds) && filter.matches(info.getDestination())) {
@@ -86,9 +90,13 @@ public class ConduitBridge extends DemandForwardingBridge {
     }
 
     // we want to conduit statically included consumers which are local networkSubs
-    // but we don't want to conduit remote network subs i.e. (proxy proxy) consumers
+    // but we don't want to conduit remote network queue subs i.e. (proxy proxy) consumers
+    // unless isConduitNetworkQueueSubscriptions is true
+    // We always want to conduit topic subscriptions
     private boolean canConduit(DemandSubscription ds) {
-        return ds.isStaticallyIncluded() || !ds.getRemoteInfo().isNetworkSubscription();
+        return ds.isStaticallyIncluded() || ds.getRemoteInfo().getDestination().isTopic() ||
+                !ds.getRemoteInfo().isNetworkSubscription() ||
+                (ds.getRemoteInfo().getDestination().isQueue() && configuration.isConduitNetworkQueueSubscriptions());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/d5a987b6/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 879ab39..efdfa5a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -661,6 +661,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         }
     }
 
+    /**
+     * Checks whether or not this consumer is a direct bridge network subscription
+     * @param info
+     * @return
+     */
+    protected boolean isBridgeNS(ConsumerInfo info) {
+        return (info.getSubscriptionName() != null && info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
+                (info.getClientId() == null || info.getClientId().startsWith(configuration.getName()));
+    }
+
     protected void serviceRemoteCommand(Command command) {
         if (!disposed.get()) {
             try {
@@ -694,7 +704,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                                                 for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
                                                     //re-add any process any non-NC consumers that match the
                                                     //dynamicallyIncludedDestinations list
-                                                    if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
+                                                    //Also re-add network consumers that are not part of this direct
+                                                    //bridge (proxy of proxy bridges)
+                                                    if((info.getSubscriptionName() == null || !isBridgeNS(info)) &&
                                                             NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
                                                         serviceRemoteConsumerAdvisory(info);
                                                     }
@@ -986,7 +998,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
             localBroker.oneway(sending);
 
             //remove subscriber from map
-            i.remove();
+            if (i != null) {
+                i.remove();
+            }
         }
     }
 
@@ -1072,7 +1086,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
             subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
             subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
 
-            // continue removal in separate thread to free up this thread for outstanding responses
+            // continue removal in separate thread to free up tshis thread for outstanding responses
             // Serialize with removeDestination operations so that removeSubs are serialized with
             // removeDestinations such that all removeSub advisories are generated
             serialExecutor.execute(new Runnable() {
@@ -1080,7 +1094,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 public void run() {
                     sub.waitForCompletion();
                     try {
-                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
+                        //If removing a network durable subscription that still has durable remote subs
+                        //make sure we cleanup the durable subscription properly - necessary when using
+                        //durable subscriptions and 3 or more brokers
+                        if (configuration.isConduitSubscriptions() &&
+                                sub.getLocalInfo().getSubscriptionName() != null &&
+                                sub.getLocalInfo().getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) &&
+                                sub.getDurableRemoteSubs().size() > 0) {
+                            sub.getDurableRemoteSubs().clear();
+                            cleanupDurableSub(sub, null);
+                        } else {
+                            localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
+                        }
                     } catch (IOException e) {
                         LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
                     }
@@ -1315,13 +1340,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
             for (ActiveMQDestination dest : dests) {
                 if (isPermissableDestination(dest)) {
                     DemandSubscription sub = createDemandSubscription(dest, null);
-                    sub.setStaticallyIncluded(true);
-                    try {
-                        addSubscription(sub);
-                    } catch (IOException e) {
-                        LOG.error("Failed to add static destination {}", dest, e);
+                    if (sub != null) {
+                        sub.setStaticallyIncluded(true);
+                        try {
+                            addSubscription(sub);
+                        } catch (IOException e) {
+                            LOG.error("Failed to add static destination {}", dest, e);
+                        }
+                        LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
+                    } else {
+                        LOG.info("{}, static destination excluded: {}, demand already exists", configuration.getBrokerName(), dest);
                     }
-                    LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
                 } else {
                     LOG.info("{}, static destination excluded: {}", configuration.getBrokerName(), dest);
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d5a987b6/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index fb2b6c9..42f30a4 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -75,12 +75,15 @@ public class DurableConduitBridge extends ConduitBridge {
                             String candidateSubName = getSubscriberName(dest);
                             for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
                                 String subName = subscription.getConsumerInfo().getSubscriptionName();
-                                if (subName != null && subName.equals(candidateSubName)) {
+                                String clientId = subscription.getContext().getClientId();
+                                if (subName != null && subName.equals(candidateSubName) && clientId.startsWith(configuration.getName())) {
                                     DemandSubscription sub = createDemandSubscription(dest, subName);
-                                    sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
-                                    sub.setStaticallyIncluded(true);
-                                    addSubscription(sub);
-                                    break;
+                                    if (sub != null) {
+                                        sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
+                                        sub.setStaticallyIncluded(true);
+                                        addSubscription(sub);
+                                        break;
+                                    }
                                 }
                             }
                         }
@@ -139,8 +142,12 @@ public class DurableConduitBridge extends ConduitBridge {
             info.setSubscriptionName(getSubscriberName(info.getDestination()));
             // and override the consumerId with something unique so that it won't
             // be removed if the durable subscriber (at the other end) goes away
-            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
+            //Only do this for direct bridge consumers - proxy network consumers we don't
+            //want to replace the consumerId or cleanup won't happen properly
+            if (info.getBrokerPath().length == 1 || (info.getBrokerPath().length > 1 && info.getBrokerPath()[0] == remoteBrokerPath[0])) {
+                info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
                                consumerIdGenerator.getNextSequenceId()));
+            }
         }
         info.setSelector(null);
         DemandSubscription demandSubscription = doCreateDemandSubscription(info);

http://git-wip-us.apache.org/repos/asf/activemq/blob/d5a987b6/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index 9e596d4..b2ca78a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -29,6 +29,11 @@ import org.apache.activemq.command.ConsumerInfo;
 public class NetworkBridgeConfiguration {
 
     private boolean conduitSubscriptions = true;
+    /**
+     * Whether or not network subscriptions on queues are eligible to be conduit
+     * Default is false
+     */
+    private boolean conduitNetworkQueueSubscriptions;
     private boolean useVirtualDestSubs;
     private boolean dynamicOnly;
     private boolean syncDurableSubs;
@@ -85,6 +90,14 @@ public class NetworkBridgeConfiguration {
         this.conduitSubscriptions = conduitSubscriptions;
     }
 
+    public boolean isConduitNetworkQueueSubscriptions() {
+        return conduitNetworkQueueSubscriptions;
+    }
+
+    public void setConduitNetworkQueueSubscriptions(boolean conduitNetworkQueueSubscriptions) {
+        this.conduitNetworkQueueSubscriptions = conduitNetworkQueueSubscriptions;
+    }
+
     /**
      * @return the dynamicOnly
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/d5a987b6/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
new file mode 100644
index 0000000..ff09a1c
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFilter;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.SubscriptionKey;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+
+import com.google.common.collect.Lists;
+
+import junit.framework.Test;
+
+/**
+ * Test to make sure durable subscriptions propagate properly throughout network bridges
+ * and that conduit subscriptions work properly
+ */
+public class DurableThreeBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport {
+
+    @Override
+    protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
+        NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName);
+        connector.setDynamicallyIncludedDestinations(
+                Lists.<ActiveMQDestination> newArrayList(new ActiveMQTopic("TEST.FOO?forceDurable=true")));
+        connector.setDuplex(true);
+        connector.setDecreaseNetworkConsumerPriority(false);
+        connector.setConduitSubscriptions(true);
+        connector.setSyncDurableSubs(true);
+        connector.setNetworkTTL(-1);
+        return connector;
+    }
+
+    /**
+     * BrokerA -> BrokerB -> BrokerC
+     */
+    public void testDurablePropagation() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        sendMessages("BrokerC", dest, 1);
+        assertNotNull(clientA.receive(1000));
+        assertNotNull(clientB.receive(1000));
+
+        //bring online a consumer on the other side
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
+        //there will be 2 network durables, 1 for each direction of the bridge
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientB.close();
+        clientC.close();
+        ses.unsubscribe("subA");
+        ses.unsubscribe("subB");
+        ses2.unsubscribe("subC");
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+
+    }
+
+    public void testForceDurablePropagation() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        MessageConsumer clientA = ses.createConsumer(dest);
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        sendMessages("BrokerC", dest, 1);
+        assertNotNull(clientA.receive(1000));
+
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientC = ses2.createConsumer(dest);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientC.close();
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+    }
+
+    public void testDurablePropagationSync() throws Exception {
+        // Setup broker networks
+        NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB");
+        NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerC");
+
+        startAllBrokers();
+
+        nc1.stop();
+        nc2.stop();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
+        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+
+        nc1.start();
+        nc2.start();
+
+        //there will be 2 network durables, 1 for each direction of the bridge
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientB.close();
+        clientC.close();
+    }
+
+
+    protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest,
+            final int count) throws Exception {
+        assertTrue(Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return count == getNCDurableSubs(brokerService, dest).size();
+            }
+        }, 10000, 500));
+    }
+
+    protected List<DurableTopicSubscription> getNCDurableSubs(final BrokerService brokerService,
+            final ActiveMQTopic dest) throws Exception {
+        List<DurableTopicSubscription> subs = new ArrayList<>();
+        Destination d = brokerService.getDestination(dest);
+        org.apache.activemq.broker.region.Topic destination = null;
+        if (d instanceof DestinationFilter) {
+            destination = ((DestinationFilter) d).getAdaptor(org.apache.activemq.broker.region.Topic.class);
+        } else {
+            destination = (org.apache.activemq.broker.region.Topic) d;
+        }
+
+        for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
+            if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) {
+                DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
+                if (sub != null) {
+                    subs.add(sub);
+                }
+            }
+        }
+
+        return subs;
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+        String options = new String("?persistent=false&useJmx=false");
+        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
+        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
+        createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
+    }
+
+    @Override
+    protected void configureBroker(BrokerService broker) {
+        broker.setBrokerId(broker.getBrokerName());
+    }
+
+    protected Session createSession(String broker) throws Exception {
+        Connection con = createConnection(broker);
+        con.start();
+        return con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public static Test suite() {
+        return suite(DurableThreeBrokerNetworkBridgeTest.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/d5a987b6/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
index 9eeb28c..d04e1f5 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
@@ -61,14 +61,19 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
                 unhandledExceptions.isEmpty());
     }
 
-    public NetworkConnector bridge(String from, String to) throws Exception {
+    public NetworkConnector bridge(String from, String to, boolean conduitNetworkQueueSubscriptions) throws Exception {
         NetworkConnector networkConnector = bridgeBrokers(from, to, true, NETWORK_TTL, CONDUIT);
         networkConnector.setSuppressDuplicateQueueSubscriptions(true);
         networkConnector.setDecreaseNetworkConsumerPriority(true);
         networkConnector.setDuplex(DUPLEX);
+        networkConnector.setConduitNetworkQueueSubscriptions(conduitNetworkQueueSubscriptions);
         return networkConnector;
     }
 
+    public NetworkConnector bridge(String from, String to) throws Exception {
+        return bridge(from, to, false);
+    }
+
     /*why conduit proxy proxy consumers gets us in a knot w.r.t removal
     DC-7 for CA-9, add DB-15, remove CA-9, add CB-8
     CB-8 add DC-7
@@ -137,6 +142,63 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
 
     }
 
+    public void testConsumerOnEachBrokerNetworkQueueConduitSubs() throws Exception {
+        bridge("Broker0", "Broker1", true);
+        if (!DUPLEX) bridge("Broker1", "Broker0", true);
+
+        bridge("Broker1", "Broker2", true);
+        if (!DUPLEX) bridge("Broker2", "Broker1", true);
+
+        startAllBrokers();
+        waitForBridgeFormation(brokers.get("Broker0").broker, 1, 0);
+        waitForBridgeFormation(brokers.get("Broker2").broker, 1, 0);
+        waitForBridgeFormation(brokers.get("Broker1").broker, 1, 0);
+        waitForBridgeFormation(brokers.get("Broker1").broker, 1, 1);
+
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Setup consumers
+        for (int i = 0; i < BROKER_COUNT; i++) {
+            consumerMap.put("Consumer:" + i + ":0", createConsumer("Broker" + i, dest));
+        }
+
+        //Conduit network queue conduit subs is true so should only be 2 subs
+        assertExactConsumersConnect("Broker0", 2, 1, TIMEOUT);
+        assertExactConsumersConnect("Broker2", 2, 1, TIMEOUT);
+        // still should be 3 subs for the middle broker, 1 for each direction
+        assertExactConsumersConnect("Broker1", 3, 1, TIMEOUT);
+
+        assertNoUnhandledExceptions();
+
+        LOG.info("Complete the mesh - 0->2");
+
+        // shorter route
+        NetworkConnector nc = bridge("Broker0", "Broker2");
+        nc.setBrokerName("Broker0");
+        nc.start();
+
+
+        if (!DUPLEX) {
+            LOG.info("... complete the mesh - 2->0");
+            nc = bridge("Broker2", "Broker0");
+            nc.setBrokerName("Broker2");
+            nc.start();
+        }
+
+        // reverse order close
+        consumerMap.get("Consumer:" + 2 + ":0").close();
+        TimeUnit.SECONDS.sleep(1);
+        consumerMap.get("Consumer:" + 1 + ":0").close();
+        TimeUnit.SECONDS.sleep(1);
+        consumerMap.get("Consumer:" + 0 + ":0").close();
+
+        LOG.info("Check for no consumers..");
+        for (int i = 0; i < BROKER_COUNT; i++) {
+            assertExactConsumersConnect("Broker" + i, 0, 0, TIMEOUT);
+        }
+
+    }
+
     public void testXConsumerOnEachBroker() throws Exception {
         bridge("Broker0", "Broker1");
         if (!DUPLEX) bridge("Broker1", "Broker0");
@@ -232,13 +294,14 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
         }, timeout));
     }
 
+    @Override
     public void setUp() throws Exception {
         super.setAutoFail(true);
         super.setUp();
 
         unhandledExceptions.clear();
         Thread.setDefaultUncaughtExceptionHandler(this);
-        
+
         // Setup n brokers
         for (int i = 0; i < BROKER_COUNT; i++) {
             createBroker(new URI("broker:(tcp://localhost:6161" + i + ")/Broker" + i + "?persistent=false&useJmx=true&brokerId=Broker" + i));
@@ -256,6 +319,7 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
         brokerService.setDestinationPolicy(policyMap);
     }
 
+    @Override
     public void uncaughtException(Thread t, Throwable e) {
         synchronized(unhandledExceptions) {
             unhandledExceptions.put(t, e);


[4/6] activemq git commit: AMQ-6858 - handle resync of network proxy durables after restart

Posted by cs...@apache.org.
AMQ-6858 - handle resync of network proxy durables after restart

We need to properly handle the re-addition of network proxy durables
after the brokers are restarted so removal is done properly

(cherry picked from commit 6013441a9a7c4a13f7412d6d72638de0f420e6a3)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/247243c9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/247243c9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/247243c9

Branch: refs/heads/activemq-5.15.x
Commit: 247243c9c120f5d1c526330813dcc2b8079f52c3
Parents: 39d6321
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed Nov 15 08:22:47 2017 -0500
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Nov 16 07:50:40 2017 -0500

----------------------------------------------------------------------
 .../apache/activemq/network/ConduitBridge.java  |  14 ++-
 .../network/DemandForwardingBridgeSupport.java  |  89 +++++++++-----
 .../activemq/network/DemandSubscription.java    |   6 -
 .../DurableFiveBrokerNetworkBridgeTest.java     | 117 ++++++++++++++++++-
 4 files changed, 185 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/247243c9/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
index bc9d004..70f45f7 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
@@ -81,10 +81,16 @@ public class ConduitBridge extends DemandForwardingBridge {
                         ds.addForcedDurableConsumer(info.getConsumerId());
                     }
                 } else {
-                	if (isProxyNSConsumer(info)) {
-                	    final BrokerId[] path = info.getBrokerPath();
-                	    addProxyNetworkSubscription(ds, path, info.getSubscriptionName());
-                	} else {
+                    //Handle the demand generated by proxy network subscriptions
+                    //The broker path is case is normal
+                    if (isProxyNSConsumerBrokerPath(info)) {
+                        final BrokerId[] path = info.getBrokerPath();
+                        addProxyNetworkSubscriptionBrokerPath(ds, path, info.getSubscriptionName());
+                    //This is the durable sync case on broker restart
+                    } else if (isProxyNSConsumerClientId(info.getClientId()) &&
+                            isProxyBridgeSubscription(info.getClientId(), info.getSubscriptionName())) {
+                        addProxyNetworkSubscriptionClientId(ds, info.getClientId(), info.getSubscriptionName());
+                    } else {
                 		ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
                 	}
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/247243c9/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 75084d1..df493c3 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -36,7 +36,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import javax.management.ObjectName;
@@ -672,31 +671,53 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 (info.getClientId() == null || info.getClientId().startsWith(configuration.getName()));
     }
 
-    private boolean isProxyBridgeSubscription(SubscriptionInfo info) {
-        if (info.getSubcriptionName() != null && info.getClientId() != null) {
-            if (info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)
-                    && !info.getClientId().startsWith(configuration.getName())) {
+    protected boolean isProxyBridgeSubscription(String clientId, String subName) {
+        if (subName != null && clientId != null) {
+            if (subName.startsWith(DURABLE_SUB_PREFIX) && !clientId.startsWith(configuration.getName())) {
                 return true;
             }
         }
         return false;
     }
 
-    protected void addProxyNetworkSubscription(final DemandSubscription sub, final BrokerId[] path, String subName) {
+    /**
+     * This scenaior is primarily used for durable sync on broker restarts
+     *
+     * @param sub
+     * @param clientId
+     * @param subName
+     */
+    protected void addProxyNetworkSubscriptionClientId(final DemandSubscription sub, final String clientId, String subName) {
+        if (clientId != null && sub != null && subName != null) {
+                String newClientId = getProxyBridgeClientId(clientId);
+                final SubscriptionInfo newSubInfo = new SubscriptionInfo(newClientId, subName);
+                sub.getDurableRemoteSubs().add(newSubInfo);
+                LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo);
+
+        } else {
+            LOG.debug("Skipping addProxyNetworkSubscription");
+        }
+    }
+
+    /**
+     * Add a durable remote proxy subscription when we can generate via the BrokerId path
+     * This is the most common scenario
+     *
+     * @param sub
+     * @param path
+     * @param subName
+     */
+    protected void addProxyNetworkSubscriptionBrokerPath(final DemandSubscription sub, final BrokerId[] path, String subName) {
         if (sub != null && path.length > 1 && subName != null) {
             String b1 = path[path.length-1].toString();
             String b2 = path[path.length-2].toString();
             final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + b1, subName);
             sub.getDurableRemoteSubs().add(newSubInfo);
-            sub.getNetworkDemandConsumerMap().computeIfAbsent(newSubInfo, v -> new AtomicInteger()).incrementAndGet();
-            LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo);
-        } else {
-            LOG.debug("Skipping addProxyNetworkSubscription");
         }
     }
 
-    private String getProxyBridgeClientId(SubscriptionInfo info) {
-        String newClientId = info.getClientId();
+    private String getProxyBridgeClientId(String clientId) {
+        String newClientId = clientId;
         String[] clientIdTokens = newClientId != null ? newClientId.split(Pattern.quote(configuration.getClientIdToken())) : null;
         if (clientIdTokens != null && clientIdTokens.length > 2) {
             newClientId = clientIdTokens[clientIdTokens.length - 3] +  configuration.getClientIdToken() + "inbound"
@@ -705,10 +726,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         return newClientId;
     }
 
-    protected boolean isProxyNSConsumer(ConsumerInfo info) {
+    protected boolean isProxyNSConsumerBrokerPath(ConsumerInfo info) {
         return info.getBrokerPath() != null && info.getBrokerPath().length > 1;
     }
 
+    protected boolean isProxyNSConsumerClientId(String clientId) {
+        return clientId != null && clientId.split(Pattern.quote(configuration.getClientIdToken())).length > 3;
+    }
+
     protected void serviceRemoteCommand(Command command) {
         if (!disposed.get()) {
             try {
@@ -1008,28 +1033,26 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
            }
 
         } else if (data.getClass() == RemoveSubscriptionInfo.class) {
-            RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
-            SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
+            final RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
+            final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
+            final boolean proxyBridgeSub = isProxyBridgeSubscription(subscriptionInfo.getClientId(),
+                    subscriptionInfo.getSubscriptionName());
             for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
                 DemandSubscription ds = i.next();
                 boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
 
-                if (removed) {
-                    cleanupDurableSub(ds, i);
                 //If this is a proxy bridge subscription we need to try changing the clientId
-                } else if (!removed && isProxyBridgeSubscription(subscriptionInfo)){
-                    subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo));
+                if (!removed && proxyBridgeSub){
+                    subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo.getClientId()));
                     if (ds.getDurableRemoteSubs().contains(subscriptionInfo)) {
-                        AtomicInteger count = ds.getNetworkDemandConsumerMap().computeIfAbsent(subscriptionInfo, v -> new AtomicInteger());
-                        count.decrementAndGet();
-                        //Only remove the durable remote sub if the count <= 0
-                        if (count.get() <= 0) {
-                            ds.getDurableRemoteSubs().remove(subscriptionInfo);
-                            ds.getNetworkDemandConsumerMap().remove(subscriptionInfo);
-                            cleanupDurableSub(ds, i);
-                        }
+                        ds.getDurableRemoteSubs().remove(subscriptionInfo);
+                        removed = true;
                     }
                 }
+
+                if (removed) {
+                    cleanupDurableSub(ds, i);
+                }
             }
         }
     }
@@ -1407,9 +1430,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 undoMapRegistration(sub);
             } else {
                 if (consumerInfo.isDurable()) {
-                	if (isProxyNSConsumer(sub.getRemoteInfo())) {
-                		BrokerId[] path = sub.getRemoteInfo().getBrokerPath();
-                		addProxyNetworkSubscription(sub, path, consumerInfo.getSubscriptionName());
+                    //Handle the demand generated by proxy network subscriptions
+                    //The broker path is case is normal
+                    if (isProxyNSConsumerBrokerPath(sub.getRemoteInfo())) {
+                        final BrokerId[] path = info.getBrokerPath();
+                        addProxyNetworkSubscriptionBrokerPath(sub, path, consumerInfo.getSubscriptionName());
+                    //This is the durable sync case on broker restart
+                    } else if (isProxyNSConsumerClientId(sub.getRemoteInfo().getClientId()) &&
+                            isProxyBridgeSubscription(info.getClientId(), info.getSubscriptionName())) {
+                		addProxyNetworkSubscriptionClientId(sub, sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName());
                 	} else {
             			sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
             		}

http://git-wip-us.apache.org/repos/asf/activemq/blob/247243c9/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
index 96a9baf..843a6d1 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
@@ -44,8 +44,6 @@ public class DemandSubscription {
     private final AtomicBoolean activeWaiter = new AtomicBoolean();
     private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>();
     private final Set<ConsumerId> forcedDurableConsumers = new CopyOnWriteArraySet<ConsumerId>();
-    //Used for proxy network consumers
-    private final Map<SubscriptionInfo, AtomicInteger> networkDemandConsumerMap = new ConcurrentHashMap<>();
     private SubscriptionInfo localDurableSubscriber;
 
     private NetworkBridgeFilter networkBridgeFilter;
@@ -87,10 +85,6 @@ public class DemandSubscription {
         return durableRemoteSubs;
     }
 
-    public Map<SubscriptionInfo, AtomicInteger> getNetworkDemandConsumerMap() {
-        return networkDemandConsumerMap;
-    }
-
     /**
      * @return true if there are no interested consumers
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/247243c9/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
index 94d7393..2d8dc03 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.network;
 
+import java.io.File;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
@@ -46,6 +47,7 @@ import junit.framework.Test;
 public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport {
 
     private boolean duplex = true;
+    private boolean deletePersistentMessagesOnStartup = true;
 
     @Override
     protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
@@ -61,6 +63,117 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         return connector;
     }
 
+    public void testDurablePropagationBrokerRestartDuplex() throws Exception {
+        duplex = true;
+        testDurablePropagationBrokerRestart();
+    }
+
+    public void testDurablePropagationBrokerRestartOneWay() throws Exception {
+        duplex = false;
+        testDurablePropagationBrokerRestart();
+    }
+
+    protected void testDurablePropagationBrokerRestart() throws Exception {
+        deletePersistentMessagesOnStartup = true;
+
+        // Setup broker networks
+        bridgeBrokers("Broker_A_A", "Broker_B_B");
+        bridgeBrokers("Broker_B_B", "Broker_C_C");
+        bridgeBrokers("Broker_C_C", "Broker_D_D");
+        bridgeBrokers("Broker_D_D", "Broker_E_E");
+
+        if (!duplex) {
+            bridgeBrokers("Broker_B_B", "Broker_A_A");
+            bridgeBrokers("Broker_C_C", "Broker_B_B");
+            bridgeBrokers("Broker_D_D", "Broker_C_C");
+            bridgeBrokers("Broker_E_E", "Broker_D_D");
+        }
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Connection conn = brokers.get("Broker_A_A").factory.createConnection();
+        conn.setClientID("clientId1");
+        conn.start();
+        Session ses = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientA2 = ses.createDurableSubscriber(dest, "subA2");
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+
+        //bring online a consumer on the other side
+        Connection conn2 = brokers.get("Broker_E_E").factory.createConnection();
+        conn2.setClientID("clientId2");
+        conn2.start();
+        Session ses2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
+        MessageConsumer clientE2 = ses2.createDurableSubscriber(dest, "subE2");
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
+
+        clientA.close();
+        clientA2.close();
+        clientE.close();
+        clientE2.close();
+
+        this.destroyAllBrokers();
+        deletePersistentMessagesOnStartup = false;
+        String options = new String("?persistent=true&useJmx=false");
+        createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options));
+        createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options));
+        createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + options));
+        createBroker(new URI("broker:(tcp://localhost:61619)/Broker_D_D" + options));
+        createBroker(new URI("broker:(tcp://localhost:61620)/Broker_E_E" + options));
+        bridgeBrokers("Broker_A_A", "Broker_B_B");
+        bridgeBrokers("Broker_B_B", "Broker_C_C");
+        bridgeBrokers("Broker_C_C", "Broker_D_D");
+        bridgeBrokers("Broker_D_D", "Broker_E_E");
+        if (!duplex) {
+            bridgeBrokers("Broker_B_B", "Broker_A_A");
+            bridgeBrokers("Broker_C_C", "Broker_B_B");
+            bridgeBrokers("Broker_D_D", "Broker_C_C");
+            bridgeBrokers("Broker_E_E", "Broker_D_D");
+        }
+
+        startAllBrokers();
+
+        conn = brokers.get("Broker_A_A").factory.createConnection();
+        conn.setClientID("clientId1");
+        conn.start();
+        ses = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        conn2 = brokers.get("Broker_E_E").factory.createConnection();
+        conn2.setClientID("clientId2");
+        conn2.start();
+        ses2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        //bring one online and leave others offline to test mixed
+        clientE = ses2.createDurableSubscriber(dest, "subE");
+        clientE.close();
+
+        ses.unsubscribe("subA");
+        ses.unsubscribe("subA2");
+        ses2.unsubscribe("subE");
+        ses2.unsubscribe("subE2");
+
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 0);
+    }
+
     public void testDurablePropagationDuplex() throws Exception {
         duplex = true;
         testDurablePropagation();
@@ -552,7 +665,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
     public void setUp() throws Exception {
         super.setAutoFail(true);
         super.setUp();
-        String options = new String("?persistent=false&useJmx=false");
+        String options = new String("?persistent=true&useJmx=false");
         createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options));
         createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options));
         createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + options));
@@ -563,6 +676,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
     @Override
     protected void configureBroker(BrokerService broker) {
         broker.setBrokerId(broker.getBrokerName());
+        broker.setDeleteAllMessagesOnStartup(deletePersistentMessagesOnStartup);
+        broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "DurableFiveBrokerNetworkBridgeTest");
     }
 
     protected Session createSession(String broker) throws Exception {


[5/6] activemq git commit: (AMQ-6858) - test fix

Posted by cs...@apache.org.
(AMQ-6858) - test fix

(cherry picked from commit 2e3a7f6114fae1ffb9c19ee4af2f4541fdfc238e)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3285be60
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3285be60
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3285be60

Branch: refs/heads/activemq-5.15.x
Commit: 3285be60ede1ee6cf377d4d4eeca6fbff79d5dec
Parents: 247243c
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed Nov 15 09:21:44 2017 -0500
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Nov 16 07:50:47 2017 -0500

----------------------------------------------------------------------
 .../activemq/network/DurableFiveBrokerNetworkBridgeTest.java      | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/3285be60/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
index 2d8dc03..5bbd8b8 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -74,8 +74,6 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
     }
 
     protected void testDurablePropagationBrokerRestart() throws Exception {
-        deletePersistentMessagesOnStartup = true;
-
         // Setup broker networks
         bridgeBrokers("Broker_A_A", "Broker_B_B");
         bridgeBrokers("Broker_B_B", "Broker_C_C");
@@ -665,6 +663,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
     public void setUp() throws Exception {
         super.setAutoFail(true);
         super.setUp();
+        deletePersistentMessagesOnStartup = true;
         String options = new String("?persistent=true&useJmx=false");
         createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options));
         createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options));


[3/6] activemq git commit: AMQ-6858 - Allow configuration of the client Id token in network bridge

Posted by cs...@apache.org.
AMQ-6858 - Allow configuration of the client Id token in network bridge

The client id token that is used to separate parts of a generated local
client id in a network bridge should be configurable so bridge names and
broker names can contain underscores if desired

(cherry picked from commit 3ca439cadaa8c40f506aa1ef683b36624d216254)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/39d6321a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/39d6321a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/39d6321a

Branch: refs/heads/activemq-5.15.x
Commit: 39d6321a4a2e9f929ba9cc451cafa8f244e3d716
Parents: 97fe20a
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Nov 14 14:26:22 2017 -0500
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Nov 16 07:50:31 2017 -0500

----------------------------------------------------------------------
 .../network/DemandForwardingBridgeSupport.java  |  25 +-
 .../network/NetworkBridgeConfiguration.java     |   9 +
 .../DurableFiveBrokerNetworkBridgeTest.java     | 293 ++++++++++---------
 3 files changed, 167 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/39d6321a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 03e79e4..75084d1 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 
 import javax.management.ObjectName;
 
@@ -492,7 +493,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
 
                     localConnectionInfo = new ConnectionInfo();
                     localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
-                    localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
+                    localClientId = configuration.getName() + configuration.getClientIdToken() + remoteBrokerName + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + configuration.getBrokerName();
                     localConnectionInfo.setClientId(localClientId);
                     localConnectionInfo.setUserName(configuration.getUserName());
                     localConnectionInfo.setPassword(configuration.getPassword());
@@ -520,8 +521,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
 
                         ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
                         duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
-                        duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
-                                + configuration.getBrokerName());
+                        duplexLocalConnectionInfo.setClientId(configuration.getName() + configuration.getClientIdToken() + remoteBrokerName + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + "duplex"
+                                + configuration.getClientIdToken() + configuration.getBrokerName());
                         duplexLocalConnectionInfo.setUserName(configuration.getUserName());
                         duplexLocalConnectionInfo.setPassword(configuration.getPassword());
 
@@ -609,7 +610,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 }
                 remoteConnectionInfo = new ConnectionInfo();
                 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
-                remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
+                remoteConnectionInfo.setClientId(configuration.getName() + configuration.getClientIdToken() + configuration.getBrokerName() + configuration.getClientIdToken() + "outbound");
                 remoteConnectionInfo.setUserName(configuration.getUserName());
                 remoteConnectionInfo.setPassword(configuration.getPassword());
                 remoteBroker.oneway(remoteConnectionInfo);
@@ -685,7 +686,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         if (sub != null && path.length > 1 && subName != null) {
             String b1 = path[path.length-1].toString();
             String b2 = path[path.length-2].toString();
-            final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + "_inbound_" + b1, subName);
+            final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + b1, subName);
             sub.getDurableRemoteSubs().add(newSubInfo);
             sub.getNetworkDemandConsumerMap().computeIfAbsent(newSubInfo, v -> new AtomicInteger()).incrementAndGet();
             LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo);
@@ -695,15 +696,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
     }
 
     private String getProxyBridgeClientId(SubscriptionInfo info) {
-        String[] clientIdTokens = info.getClientId().split("_");
-        String newClientId = "";
-        if (clientIdTokens.length > 2) {
-            for (int j = clientIdTokens.length - 3; j < clientIdTokens.length; j++) {
-                newClientId += clientIdTokens[j];
-                if (j < clientIdTokens.length -1) {
-                    newClientId += "_";
-                }
-            }
+        String newClientId = info.getClientId();
+        String[] clientIdTokens = newClientId != null ? newClientId.split(Pattern.quote(configuration.getClientIdToken())) : null;
+        if (clientIdTokens != null && clientIdTokens.length > 2) {
+            newClientId = clientIdTokens[clientIdTokens.length - 3] +  configuration.getClientIdToken() + "inbound"
+                    + configuration.getClientIdToken() +  clientIdTokens[clientIdTokens.length -1];
         }
         return newClientId;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/39d6321a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index b2ca78a..3c64758 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -59,6 +59,7 @@ public class NetworkBridgeConfiguration {
     private String password;
     private String destinationFilter = null;
     private String name = "NC";
+    private String clientIdToken = "_";
 
     protected List<ActiveMQDestination> excludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
     protected List<ActiveMQDestination> dynamicallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
@@ -191,6 +192,14 @@ public class NetworkBridgeConfiguration {
         this.brokerName = brokerName;
     }
 
+    public String getClientIdToken() {
+        return clientIdToken;
+    }
+
+    public void setClientIdToken(String clientIdToken) {
+        this.clientIdToken = clientIdToken;
+    }
+
     /**
      * @return the networkTTL
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/39d6321a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
index 4a63553..94d7393 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -57,6 +57,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         connector.setConduitSubscriptions(true);
         connector.setSyncDurableSubs(true);
         connector.setNetworkTTL(-1);
+        connector.setClientIdToken("|");
         return connector;
     }
 
@@ -71,15 +72,15 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
     }
 
     /**
-     * BrokerA -> BrokerB -> BrokerC
+     * Broker_A_A -> Broker_B_B -> Broker_C_C
      */
     protected void testDurablePropagation() throws Exception {
         // Setup broker networks
-        bridgeBrokers("BrokerA", "BrokerB");
-        bridgeBrokers("BrokerB", "BrokerC");
+        bridgeBrokers("Broker_A_A", "Broker_B_B");
+        bridgeBrokers("Broker_B_B", "Broker_C_C");
         if (!duplex) {
-            bridgeBrokers("BrokerB", "BrokerA");
-            bridgeBrokers("BrokerC", "BrokerB");
+            bridgeBrokers("Broker_B_B", "Broker_A_A");
+            bridgeBrokers("Broker_C_C", "Broker_B_B");
         }
 
         startAllBrokers();
@@ -88,26 +89,26 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
 
         // Setup consumers
-        Session ses = createSession("BrokerA");
+        Session ses = createSession("Broker_A_A");
         MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
         MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
 
         // let consumers propagate around the network
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
 
-        sendMessages("BrokerC", dest, 1);
+        sendMessages("Broker_C_C", dest, 1);
         assertNotNull(clientA.receive(1000));
         assertNotNull(clientB.receive(1000));
 
         //bring online a consumer on the other side
-        Session ses2 = createSession("BrokerC");
+        Session ses2 = createSession("Broker_C_C");
         MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
         //there will be 2 network durables, 1 for each direction of the bridge
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
 
         clientA.close();
         clientB.close();
@@ -116,9 +117,9 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         ses.unsubscribe("subB");
         ses2.unsubscribe("subC");
 
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
 
     }
 
@@ -134,11 +135,11 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
 
     protected void testDurablePropagationConsumerAllBrokers() throws Exception {
         // Setup broker networks
-        bridgeBrokers("BrokerA", "BrokerB");
-        bridgeBrokers("BrokerB", "BrokerC");
+        bridgeBrokers("Broker_A_A", "Broker_B_B");
+        bridgeBrokers("Broker_B_B", "Broker_C_C");
         if (!duplex) {
-            bridgeBrokers("BrokerB", "BrokerA");
-            bridgeBrokers("BrokerC", "BrokerB");
+            bridgeBrokers("Broker_B_B", "Broker_A_A");
+            bridgeBrokers("Broker_C_C", "Broker_B_B");
         }
 
         startAllBrokers();
@@ -147,28 +148,28 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
 
         // Setup consumers
-        Session ses = createSession("BrokerA");
+        Session ses = createSession("Broker_A_A");
         MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
 
         // let consumers propagate around the network
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
 
         //bring online a consumer on the other side
-        Session ses2 = createSession("BrokerB");
+        Session ses2 = createSession("Broker_B_B");
         MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB");
 
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
 
-        Session ses3 = createSession("BrokerC");
+        Session ses3 = createSession("Broker_C_C");
         MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC");
 
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
 
 
         clientA.close();
@@ -179,9 +180,9 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         ses3.unsubscribe("subC");
 
 
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
 
     }
 
@@ -197,15 +198,15 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
 
     protected void testDurablePropagation5Broker() throws Exception {
         // Setup broker networks
-        bridgeBrokers("BrokerA", "BrokerB");
-        bridgeBrokers("BrokerB", "BrokerC");
-        bridgeBrokers("BrokerC", "BrokerD");
-        bridgeBrokers("BrokerD", "BrokerE");
+        bridgeBrokers("Broker_A_A", "Broker_B_B");
+        bridgeBrokers("Broker_B_B", "Broker_C_C");
+        bridgeBrokers("Broker_C_C", "Broker_D_D");
+        bridgeBrokers("Broker_D_D", "Broker_E_E");
         if (!duplex) {
-            bridgeBrokers("BrokerB", "BrokerA");
-            bridgeBrokers("BrokerC", "BrokerB");
-            bridgeBrokers("BrokerD", "BrokerC");
-            bridgeBrokers("BrokerE", "BrokerD");
+            bridgeBrokers("Broker_B_B", "Broker_A_A");
+            bridgeBrokers("Broker_C_C", "Broker_B_B");
+            bridgeBrokers("Broker_D_D", "Broker_C_C");
+            bridgeBrokers("Broker_E_E", "Broker_D_D");
         }
 
         startAllBrokers();
@@ -214,42 +215,42 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
 
         // Setup consumers
-        Session ses = createSession("BrokerA");
+        Session ses = createSession("Broker_A_A");
         MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
         Thread.sleep(1000);
 
         // let consumers propagate around the network
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
 
-        sendMessages("BrokerE", dest, 1);
+        sendMessages("Broker_E_E", dest, 1);
         assertNotNull(clientA.receive(1000));
 
         //bring online a consumer on the other side
-        Session ses2 = createSession("BrokerE");
+        Session ses2 = createSession("Broker_E_E");
         MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
         Thread.sleep(1000);
 
         //there will be 2 network durables, 1 for each direction of the bridge
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
 
         clientA.close();
         clientE.close();
         ses.unsubscribe("subA");
         ses2.unsubscribe("subE");
 
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 0);
 
     }
 
@@ -265,13 +266,13 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
 
     protected void testDurablePropagationSpoke() throws Exception {
         // Setup broker networks
-        bridgeBrokers("BrokerA", "BrokerB");
-        bridgeBrokers("BrokerB", "BrokerC");
-        bridgeBrokers("BrokerB", "BrokerD");
+        bridgeBrokers("Broker_A_A", "Broker_B_B");
+        bridgeBrokers("Broker_B_B", "Broker_C_C");
+        bridgeBrokers("Broker_B_B", "Broker_D_D");
         if (!duplex) {
-            bridgeBrokers("BrokerB", "BrokerA");
-            bridgeBrokers("BrokerC", "BrokerB");
-            bridgeBrokers("BrokerD", "BrokerB");
+            bridgeBrokers("Broker_B_B", "Broker_A_A");
+            bridgeBrokers("Broker_C_C", "Broker_B_B");
+            bridgeBrokers("Broker_D_D", "Broker_B_B");
         }
 
         startAllBrokers();
@@ -280,42 +281,42 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
 
         // Setup consumers
-        Session ses = createSession("BrokerA");
-        Session ses2 = createSession("BrokerB");
-        Session ses3 = createSession("BrokerC");
-        Session ses4 = createSession("BrokerD");
+        Session ses = createSession("Broker_A_A");
+        Session ses2 = createSession("Broker_B_B");
+        Session ses3 = createSession("Broker_C_C");
+        Session ses4 = createSession("Broker_D_D");
 
         MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
         MessageConsumer clientAB = ses.createDurableSubscriber(dest, "subAB");
         Thread.sleep(1000);
 
         // let consumers propagate around the network
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
 
         MessageConsumer clientD = ses4.createDurableSubscriber(dest, "subD");
         Thread.sleep(1000);
 
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
 
-        sendMessages("BrokerA", dest, 1);
+        sendMessages("Broker_A_A", dest, 1);
         assertNotNull(clientD.receive(1000));
-        sendMessages("BrokerC", dest, 1);
+        sendMessages("Broker_C_C", dest, 1);
         assertNotNull(clientD.receive(1000));
 
         MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB");
         MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC");
         Thread.sleep(1000);
 
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 3);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 3);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
 
         clientA.close();
         clientAB.close();
@@ -329,10 +330,10 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         ses3.unsubscribe("subC");
         ses4.unsubscribe("subD");
 
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 0);
     }
 
     public void testForceDurablePropagationDuplex() throws Exception {
@@ -347,11 +348,11 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
 
     protected void testForceDurablePropagation() throws Exception {
         // Setup broker networks
-        bridgeBrokers("BrokerA", "BrokerB");
-        bridgeBrokers("BrokerB", "BrokerC");
+        bridgeBrokers("Broker_A_A", "Broker_B_B");
+        bridgeBrokers("Broker_B_B", "Broker_C_C");
         if (!duplex) {
-            bridgeBrokers("BrokerB", "BrokerA");
-            bridgeBrokers("BrokerC", "BrokerB");
+            bridgeBrokers("Broker_B_B", "Broker_A_A");
+            bridgeBrokers("Broker_C_C", "Broker_B_B");
         }
 
         startAllBrokers();
@@ -360,32 +361,32 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
 
         // Setup consumers
-        Session ses = createSession("BrokerA");
+        Session ses = createSession("Broker_A_A");
         MessageConsumer clientA = ses.createConsumer(dest);
         Thread.sleep(1000);
 
         // let consumers propagate around the network
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
 
-        sendMessages("BrokerC", dest, 1);
+        sendMessages("Broker_C_C", dest, 1);
         assertNotNull(clientA.receive(1000));
 
-        Session ses2 = createSession("BrokerC");
+        Session ses2 = createSession("Broker_C_C");
         MessageConsumer clientC = ses2.createConsumer(dest);
         Thread.sleep(1000);
 
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
 
         clientA.close();
         clientC.close();
 
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
     }
 
     public void testDurablePropagationSyncDuplex() throws Exception {
@@ -400,14 +401,14 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
 
     protected void testDurablePropagationSync() throws Exception {
         // Setup broker networks
-        NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB");
-        NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerC");
+        NetworkConnector nc1 = bridgeBrokers("Broker_A_A", "Broker_B_B");
+        NetworkConnector nc2 = bridgeBrokers("Broker_B_B", "Broker_C_C");
 
         NetworkConnector nc3 = null;
         NetworkConnector nc4 = null;
         if (!duplex) {
-            nc3 = bridgeBrokers("BrokerB", "BrokerA");
-            nc4 = bridgeBrokers("BrokerC", "BrokerB");
+            nc3 = bridgeBrokers("Broker_B_B", "Broker_A_A");
+            nc4 = bridgeBrokers("Broker_C_C", "Broker_B_B");
         }
 
         startAllBrokers();
@@ -424,16 +425,16 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
 
         // Setup consumers
-        Session ses = createSession("BrokerA");
-        Session ses2 = createSession("BrokerC");
+        Session ses = createSession("Broker_A_A");
+        Session ses2 = createSession("Broker_C_C");
         MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
         MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
         MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
         Thread.sleep(1000);
 
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
 
         nc1.start();
         nc2.start();
@@ -443,9 +444,9 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         }
 
         //there will be 2 network durables, 1 for each direction of the bridge
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
 
         clientA.close();
         clientB.close();
@@ -456,14 +457,14 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         duplex = true;
 
         // Setup broker networks
-        bridgeBrokers("BrokerA", "BrokerB");
-        bridgeBrokers("BrokerB", "BrokerC");
+        bridgeBrokers("Broker_A_A", "Broker_B_B");
+        bridgeBrokers("Broker_B_B", "Broker_C_C");
 
         //Duplicate the bridges with different included destinations - valid use case
-        NetworkConnector nc3 = bridgeBrokers("BrokerA", "BrokerB");
-        NetworkConnector nc4 = bridgeBrokers("BrokerB", "BrokerC");
-        nc3.setName("nc3");
-        nc4.setName("nc4");
+        NetworkConnector nc3 = bridgeBrokers("Broker_A_A", "Broker_B_B");
+        NetworkConnector nc4 = bridgeBrokers("Broker_B_B", "Broker_C_C");
+        nc3.setName("nc_3_3");
+        nc4.setName("nc_4_4");
         nc3.setDynamicallyIncludedDestinations(
                 Lists.<ActiveMQDestination> newArrayList(new ActiveMQTopic("TEST.FOO2?forceDurable=true")));
         nc4.setDynamicallyIncludedDestinations(
@@ -476,8 +477,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         ActiveMQTopic dest2 = (ActiveMQTopic) createDestination("TEST.FOO2", true);
 
         // Setup consumers
-        Session ses = createSession("BrokerA");
-        Session ses2 = createSession("BrokerC");
+        Session ses = createSession("Broker_A_A");
+        Session ses2 = createSession("Broker_C_C");
         MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
         MessageConsumer clientAa = ses.createDurableSubscriber(dest2, "subAa");
         MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
@@ -485,33 +486,33 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         Thread.sleep(1000);
 
         //make sure network durables are online
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest2, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest2, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest2, 1);
 
         clientA.close();
         clientC.close();
         ses.unsubscribe("subA");
         ses2.unsubscribe("subC");
 
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest2, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest2, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest2, 1);
 
         clientAa.close();
         clientCc.close();
         ses.unsubscribe("subAa");
         ses2.unsubscribe("subCc");
 
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest2, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest2, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest2, 0);
     }
 
     protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest,
@@ -552,11 +553,11 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         super.setAutoFail(true);
         super.setUp();
         String options = new String("?persistent=false&useJmx=false");
-        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
-        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
-        createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
-        createBroker(new URI("broker:(tcp://localhost:61619)/BrokerD" + options));
-        createBroker(new URI("broker:(tcp://localhost:61620)/BrokerE" + options));
+        createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options));
+        createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options));
+        createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + options));
+        createBroker(new URI("broker:(tcp://localhost:61619)/Broker_D_D" + options));
+        createBroker(new URI("broker:(tcp://localhost:61620)/Broker_E_E" + options));
     }
 
     @Override


[2/6] activemq git commit: AMQ-6858 - reworking durable subscription propagation fix

Posted by cs...@apache.org.
AMQ-6858 - reworking durable subscription propagation fix

Significantly reworking previous fix so that the client id is properly
changed when tracking network proxy subscriptions. This makes it so
removal is done properly

(cherry picked from commit 41211c78d19b545a2352584d3598346aa3705be4)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/97fe20a5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/97fe20a5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/97fe20a5

Branch: refs/heads/activemq-5.15.x
Commit: 97fe20a5721a39b70f841f303024fa30352d7336
Parents: d5a987b
Author: Christopher L. Shannon <ch...@gmail.com>
Authored: Sun Nov 12 15:37:40 2017 -0500
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Nov 16 07:50:19 2017 -0500

----------------------------------------------------------------------
 .../apache/activemq/network/ConduitBridge.java  |   8 +-
 .../network/DemandForwardingBridgeSupport.java  |  86 ++-
 .../activemq/network/DemandSubscription.java    |   8 +
 .../activemq/network/DurableConduitBridge.java  |   6 +-
 .../DurableFiveBrokerNetworkBridgeTest.java     | 576 +++++++++++++++++++
 .../DurableThreeBrokerNetworkBridgeTest.java    | 241 --------
 6 files changed, 659 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/97fe20a5/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
index 6ced896..bc9d004 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.SubscriptionInfo;
@@ -80,7 +81,12 @@ public class ConduitBridge extends DemandForwardingBridge {
                         ds.addForcedDurableConsumer(info.getConsumerId());
                     }
                 } else {
-                    ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
+                	if (isProxyNSConsumer(info)) {
+                	    final BrokerId[] path = info.getBrokerPath();
+                	    addProxyNetworkSubscription(ds, path, info.getSubscriptionName());
+                	} else {
+                		ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
+                	}
                 }
                 matched = true;
                 // continue - we want interest to any existing DemandSubscriptions

http://git-wip-us.apache.org/repos/asf/activemq/blob/97fe20a5/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index efdfa5a..03e79e4 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -36,6 +36,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.ObjectName;
 
@@ -94,7 +95,6 @@ import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.failover.FailoverTransport;
-import org.apache.activemq.transport.tcp.SslTransport;
 import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.IntrospectionSupport;
@@ -666,11 +666,52 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
      * @param info
      * @return
      */
-    protected boolean isBridgeNS(ConsumerInfo info) {
+    protected boolean isDirectBridgeConsumer(ConsumerInfo info) {
         return (info.getSubscriptionName() != null && info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
                 (info.getClientId() == null || info.getClientId().startsWith(configuration.getName()));
     }
 
+    private boolean isProxyBridgeSubscription(SubscriptionInfo info) {
+        if (info.getSubcriptionName() != null && info.getClientId() != null) {
+            if (info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)
+                    && !info.getClientId().startsWith(configuration.getName())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    protected void addProxyNetworkSubscription(final DemandSubscription sub, final BrokerId[] path, String subName) {
+        if (sub != null && path.length > 1 && subName != null) {
+            String b1 = path[path.length-1].toString();
+            String b2 = path[path.length-2].toString();
+            final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + "_inbound_" + b1, subName);
+            sub.getDurableRemoteSubs().add(newSubInfo);
+            sub.getNetworkDemandConsumerMap().computeIfAbsent(newSubInfo, v -> new AtomicInteger()).incrementAndGet();
+            LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo);
+        } else {
+            LOG.debug("Skipping addProxyNetworkSubscription");
+        }
+    }
+
+    private String getProxyBridgeClientId(SubscriptionInfo info) {
+        String[] clientIdTokens = info.getClientId().split("_");
+        String newClientId = "";
+        if (clientIdTokens.length > 2) {
+            for (int j = clientIdTokens.length - 3; j < clientIdTokens.length; j++) {
+                newClientId += clientIdTokens[j];
+                if (j < clientIdTokens.length -1) {
+                    newClientId += "_";
+                }
+            }
+        }
+        return newClientId;
+    }
+
+    protected boolean isProxyNSConsumer(ConsumerInfo info) {
+        return info.getBrokerPath() != null && info.getBrokerPath().length > 1;
+    }
+
     protected void serviceRemoteCommand(Command command) {
         if (!disposed.get()) {
             try {
@@ -706,7 +747,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                                                     //dynamicallyIncludedDestinations list
                                                     //Also re-add network consumers that are not part of this direct
                                                     //bridge (proxy of proxy bridges)
-                                                    if((info.getSubscriptionName() == null || !isBridgeNS(info)) &&
+                                                    if((info.getSubscriptionName() == null || !isDirectBridgeConsumer(info)) &&
                                                             NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
                                                         serviceRemoteConsumerAdvisory(info);
                                                     }
@@ -975,8 +1016,22 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
             for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
                 DemandSubscription ds = i.next();
                 boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
+
                 if (removed) {
                     cleanupDurableSub(ds, i);
+                //If this is a proxy bridge subscription we need to try changing the clientId
+                } else if (!removed && isProxyBridgeSubscription(subscriptionInfo)){
+                    subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo));
+                    if (ds.getDurableRemoteSubs().contains(subscriptionInfo)) {
+                        AtomicInteger count = ds.getNetworkDemandConsumerMap().computeIfAbsent(subscriptionInfo, v -> new AtomicInteger());
+                        count.decrementAndGet();
+                        //Only remove the durable remote sub if the count <= 0
+                        if (count.get() <= 0) {
+                            ds.getDurableRemoteSubs().remove(subscriptionInfo);
+                            ds.getNetworkDemandConsumerMap().remove(subscriptionInfo);
+                            cleanupDurableSub(ds, i);
+                        }
+                    }
                 }
             }
         }
@@ -984,6 +1039,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
 
     private void cleanupDurableSub(final DemandSubscription ds,
             Iterator<DemandSubscription> i) throws IOException {
+
         if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()
                 && ds.getForcedDurableConsumersSize() == 0) {
             // deactivate subscriber
@@ -998,9 +1054,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
             localBroker.oneway(sending);
 
             //remove subscriber from map
-            if (i != null) {
-                i.remove();
-            }
+            i.remove();
         }
     }
 
@@ -1094,18 +1148,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 public void run() {
                     sub.waitForCompletion();
                     try {
-                        //If removing a network durable subscription that still has durable remote subs
-                        //make sure we cleanup the durable subscription properly - necessary when using
-                        //durable subscriptions and 3 or more brokers
-                        if (configuration.isConduitSubscriptions() &&
-                                sub.getLocalInfo().getSubscriptionName() != null &&
-                                sub.getLocalInfo().getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) &&
-                                sub.getDurableRemoteSubs().size() > 0) {
-                            sub.getDurableRemoteSubs().clear();
-                            cleanupDurableSub(sub, null);
-                        } else {
-                            localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
-                        }
+                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
                     } catch (IOException e) {
                         LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
                     }
@@ -1367,7 +1410,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 undoMapRegistration(sub);
             } else {
                 if (consumerInfo.isDurable()) {
-                    sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
+                	if (isProxyNSConsumer(sub.getRemoteInfo())) {
+                		BrokerId[] path = sub.getRemoteInfo().getBrokerPath();
+                		addProxyNetworkSubscription(sub, path, consumerInfo.getSubscriptionName());
+                	} else {
+            			sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
+            		}
                 }
                 addSubscription(sub);
                 LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub);

http://git-wip-us.apache.org/repos/asf/activemq/blob/97fe20a5/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
index 371df0a..96a9baf 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
@@ -16,7 +16,9 @@
  */
 package org.apache.activemq.network;
 
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,6 +44,8 @@ public class DemandSubscription {
     private final AtomicBoolean activeWaiter = new AtomicBoolean();
     private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>();
     private final Set<ConsumerId> forcedDurableConsumers = new CopyOnWriteArraySet<ConsumerId>();
+    //Used for proxy network consumers
+    private final Map<SubscriptionInfo, AtomicInteger> networkDemandConsumerMap = new ConcurrentHashMap<>();
     private SubscriptionInfo localDurableSubscriber;
 
     private NetworkBridgeFilter networkBridgeFilter;
@@ -83,6 +87,10 @@ public class DemandSubscription {
         return durableRemoteSubs;
     }
 
+    public Map<SubscriptionInfo, AtomicInteger> getNetworkDemandConsumerMap() {
+        return networkDemandConsumerMap;
+    }
+
     /**
      * @return true if there are no interested consumers
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/97fe20a5/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index 42f30a4..8d14f74 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -142,12 +142,8 @@ public class DurableConduitBridge extends ConduitBridge {
             info.setSubscriptionName(getSubscriberName(info.getDestination()));
             // and override the consumerId with something unique so that it won't
             // be removed if the durable subscriber (at the other end) goes away
-            //Only do this for direct bridge consumers - proxy network consumers we don't
-            //want to replace the consumerId or cleanup won't happen properly
-            if (info.getBrokerPath().length == 1 || (info.getBrokerPath().length > 1 && info.getBrokerPath()[0] == remoteBrokerPath[0])) {
-                info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
+           info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
                                consumerIdGenerator.getNextSequenceId()));
-            }
         }
         info.setSelector(null);
         DemandSubscription demandSubscription = doCreateDemandSubscription(info);

http://git-wip-us.apache.org/repos/asf/activemq/blob/97fe20a5/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
new file mode 100644
index 0000000..4a63553
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFilter;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.SubscriptionKey;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+
+import com.google.common.collect.Lists;
+
+import junit.framework.Test;
+
+/**
+ * Test to make sure durable subscriptions propagate properly throughout network bridges
+ * and that conduit subscriptions work properly
+ */
+public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport {
+
+    private boolean duplex = true;
+
+    @Override
+    protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
+        NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName);
+        connector.setDynamicallyIncludedDestinations(
+                Lists.<ActiveMQDestination> newArrayList(new ActiveMQTopic("TEST.FOO?forceDurable=true")));
+        connector.setDuplex(duplex);
+        connector.setDecreaseNetworkConsumerPriority(false);
+        connector.setConduitSubscriptions(true);
+        connector.setSyncDurableSubs(true);
+        connector.setNetworkTTL(-1);
+        return connector;
+    }
+
+    public void testDurablePropagationDuplex() throws Exception {
+        duplex = true;
+        testDurablePropagation();
+    }
+
+    public void testDurablePropagationOneWay() throws Exception {
+        duplex = false;
+        testDurablePropagation();
+    }
+
+    /**
+     * BrokerA -> BrokerB -> BrokerC
+     */
+    protected void testDurablePropagation() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+        if (!duplex) {
+            bridgeBrokers("BrokerB", "BrokerA");
+            bridgeBrokers("BrokerC", "BrokerB");
+        }
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        sendMessages("BrokerC", dest, 1);
+        assertNotNull(clientA.receive(1000));
+        assertNotNull(clientB.receive(1000));
+
+        //bring online a consumer on the other side
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
+        //there will be 2 network durables, 1 for each direction of the bridge
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientB.close();
+        clientC.close();
+        ses.unsubscribe("subA");
+        ses.unsubscribe("subB");
+        ses2.unsubscribe("subC");
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+
+    }
+
+    public void testDurablePropagationConsumerAllBrokersDuplex() throws Exception {
+        duplex = true;
+        testDurablePropagationConsumerAllBrokers();
+    }
+
+    public void testDurablePropagationConsumerAllBrokersOneWay() throws Exception {
+        duplex = false;
+        testDurablePropagationConsumerAllBrokers();
+    }
+
+    protected void testDurablePropagationConsumerAllBrokers() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+        if (!duplex) {
+            bridgeBrokers("BrokerB", "BrokerA");
+            bridgeBrokers("BrokerC", "BrokerB");
+        }
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        //bring online a consumer on the other side
+        Session ses2 = createSession("BrokerB");
+        MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB");
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        Session ses3 = createSession("BrokerC");
+        MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC");
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+
+        clientA.close();
+        clientB.close();
+        clientC.close();
+        ses.unsubscribe("subA");
+        ses2.unsubscribe("subB");
+        ses3.unsubscribe("subC");
+
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+
+    }
+
+    public void testDurablePropagation5BrokerDuplex() throws Exception {
+        duplex = true;
+        testDurablePropagation5Broker();
+    }
+
+    public void testDurablePropagation5BrokerOneWay() throws Exception {
+        duplex = false;
+        testDurablePropagation5Broker();
+    }
+
+    protected void testDurablePropagation5Broker() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+        bridgeBrokers("BrokerC", "BrokerD");
+        bridgeBrokers("BrokerD", "BrokerE");
+        if (!duplex) {
+            bridgeBrokers("BrokerB", "BrokerA");
+            bridgeBrokers("BrokerC", "BrokerB");
+            bridgeBrokers("BrokerD", "BrokerC");
+            bridgeBrokers("BrokerE", "BrokerD");
+        }
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        Thread.sleep(1000);
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        sendMessages("BrokerE", dest, 1);
+        assertNotNull(clientA.receive(1000));
+
+        //bring online a consumer on the other side
+        Session ses2 = createSession("BrokerE");
+        MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
+        Thread.sleep(1000);
+
+        //there will be 2 network durables, 1 for each direction of the bridge
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientE.close();
+        ses.unsubscribe("subA");
+        ses2.unsubscribe("subE");
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 0);
+
+    }
+
+    public void testDurablePropagationSpokeDuplex() throws Exception {
+        duplex = true;
+        testDurablePropagationSpoke();
+    }
+
+    public void testDurablePropagationSpokeOneWay() throws Exception {
+        duplex = false;
+        testDurablePropagationSpoke();
+    }
+
+    protected void testDurablePropagationSpoke() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+        bridgeBrokers("BrokerB", "BrokerD");
+        if (!duplex) {
+            bridgeBrokers("BrokerB", "BrokerA");
+            bridgeBrokers("BrokerC", "BrokerB");
+            bridgeBrokers("BrokerD", "BrokerB");
+        }
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        Session ses2 = createSession("BrokerB");
+        Session ses3 = createSession("BrokerC");
+        Session ses4 = createSession("BrokerD");
+
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientAB = ses.createDurableSubscriber(dest, "subAB");
+        Thread.sleep(1000);
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        MessageConsumer clientD = ses4.createDurableSubscriber(dest, "subD");
+        Thread.sleep(1000);
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        sendMessages("BrokerA", dest, 1);
+        assertNotNull(clientD.receive(1000));
+        sendMessages("BrokerC", dest, 1);
+        assertNotNull(clientD.receive(1000));
+
+        MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB");
+        MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC");
+        Thread.sleep(1000);
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 3);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientAB.close();
+        clientB.close();
+        clientC.close();
+        clientD.close();
+
+        ses.unsubscribe("subA");
+        ses.unsubscribe("subAB");
+        ses2.unsubscribe("subB");
+        ses3.unsubscribe("subC");
+        ses4.unsubscribe("subD");
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 0);
+    }
+
+    public void testForceDurablePropagationDuplex() throws Exception {
+        duplex = true;
+        testForceDurablePropagation();
+    }
+
+    public void testForceDurablePropagationOneWay() throws Exception {
+        duplex = false;
+        testForceDurablePropagation();
+    }
+
+    protected void testForceDurablePropagation() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+        if (!duplex) {
+            bridgeBrokers("BrokerB", "BrokerA");
+            bridgeBrokers("BrokerC", "BrokerB");
+        }
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        MessageConsumer clientA = ses.createConsumer(dest);
+        Thread.sleep(1000);
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        sendMessages("BrokerC", dest, 1);
+        assertNotNull(clientA.receive(1000));
+
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientC = ses2.createConsumer(dest);
+        Thread.sleep(1000);
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientC.close();
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+    }
+
+    public void testDurablePropagationSyncDuplex() throws Exception {
+        duplex = true;
+        testDurablePropagationSync();
+    }
+
+    public void testDurablePropagationSyncOneWay() throws Exception {
+        duplex = false;
+        testDurablePropagationSync();
+    }
+
+    protected void testDurablePropagationSync() throws Exception {
+        // Setup broker networks
+        NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB");
+        NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerC");
+
+        NetworkConnector nc3 = null;
+        NetworkConnector nc4 = null;
+        if (!duplex) {
+            nc3 = bridgeBrokers("BrokerB", "BrokerA");
+            nc4 = bridgeBrokers("BrokerC", "BrokerB");
+        }
+
+        startAllBrokers();
+
+        nc1.stop();
+        nc2.stop();
+
+        if (!duplex) {
+            nc3.stop();
+            nc4.stop();
+        }
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
+        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
+        Thread.sleep(1000);
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+
+        nc1.start();
+        nc2.start();
+        if (!duplex) {
+            nc3.start();
+            nc4.start();
+        }
+
+        //there will be 2 network durables, 1 for each direction of the bridge
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientB.close();
+        clientC.close();
+    }
+
+    public void testDurablePropagationMultipleBridgesDifferentDestinations() throws Exception {
+        duplex = true;
+
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+
+        //Duplicate the bridges with different included destinations - valid use case
+        NetworkConnector nc3 = bridgeBrokers("BrokerA", "BrokerB");
+        NetworkConnector nc4 = bridgeBrokers("BrokerB", "BrokerC");
+        nc3.setName("nc3");
+        nc4.setName("nc4");
+        nc3.setDynamicallyIncludedDestinations(
+                Lists.<ActiveMQDestination> newArrayList(new ActiveMQTopic("TEST.FOO2?forceDurable=true")));
+        nc4.setDynamicallyIncludedDestinations(
+                Lists.<ActiveMQDestination> newArrayList(new ActiveMQTopic("TEST.FOO2?forceDurable=true")));
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+        ActiveMQTopic dest2 = (ActiveMQTopic) createDestination("TEST.FOO2", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientAa = ses.createDurableSubscriber(dest2, "subAa");
+        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
+        MessageConsumer clientCc = ses2.createDurableSubscriber(dest2, "subCc");
+        Thread.sleep(1000);
+
+        //make sure network durables are online
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 1);
+
+        clientA.close();
+        clientC.close();
+        ses.unsubscribe("subA");
+        ses2.unsubscribe("subC");
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 1);
+
+        clientAa.close();
+        clientCc.close();
+        ses.unsubscribe("subAa");
+        ses2.unsubscribe("subCc");
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 0);
+    }
+
+    protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest,
+            final int count) throws Exception {
+        assertTrue(Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return count == getNCDurableSubs(brokerService, dest).size();
+            }
+        }, 10000, 500));
+    }
+
+    protected List<DurableTopicSubscription> getNCDurableSubs(final BrokerService brokerService,
+            final ActiveMQTopic dest) throws Exception {
+        List<DurableTopicSubscription> subs = new ArrayList<>();
+        Destination d = brokerService.getDestination(dest);
+        org.apache.activemq.broker.region.Topic destination = null;
+        if (d instanceof DestinationFilter) {
+            destination = ((DestinationFilter) d).getAdaptor(org.apache.activemq.broker.region.Topic.class);
+        } else {
+            destination = (org.apache.activemq.broker.region.Topic) d;
+        }
+
+        for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
+            if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) {
+                DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
+                if (sub != null) {
+                    subs.add(sub);
+                }
+            }
+        }
+
+        return subs;
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+        String options = new String("?persistent=false&useJmx=false");
+        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
+        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
+        createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
+        createBroker(new URI("broker:(tcp://localhost:61619)/BrokerD" + options));
+        createBroker(new URI("broker:(tcp://localhost:61620)/BrokerE" + options));
+    }
+
+    @Override
+    protected void configureBroker(BrokerService broker) {
+        broker.setBrokerId(broker.getBrokerName());
+    }
+
+    protected Session createSession(String broker) throws Exception {
+        Connection con = createConnection(broker);
+        con.start();
+        return con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public static Test suite() {
+        return suite(DurableFiveBrokerNetworkBridgeTest.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/97fe20a5/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
deleted file mode 100644
index ff09a1c..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.network;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-import org.apache.activemq.JmsMultipleBrokersTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationFilter;
-import org.apache.activemq.broker.region.DurableTopicSubscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.MessageIdList;
-import org.apache.activemq.util.SubscriptionKey;
-import org.apache.activemq.util.Wait;
-import org.apache.activemq.util.Wait.Condition;
-
-import com.google.common.collect.Lists;
-
-import junit.framework.Test;
-
-/**
- * Test to make sure durable subscriptions propagate properly throughout network bridges
- * and that conduit subscriptions work properly
- */
-public class DurableThreeBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport {
-
-    @Override
-    protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
-        NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName);
-        connector.setDynamicallyIncludedDestinations(
-                Lists.<ActiveMQDestination> newArrayList(new ActiveMQTopic("TEST.FOO?forceDurable=true")));
-        connector.setDuplex(true);
-        connector.setDecreaseNetworkConsumerPriority(false);
-        connector.setConduitSubscriptions(true);
-        connector.setSyncDurableSubs(true);
-        connector.setNetworkTTL(-1);
-        return connector;
-    }
-
-    /**
-     * BrokerA -> BrokerB -> BrokerC
-     */
-    public void testDurablePropagation() throws Exception {
-        // Setup broker networks
-        bridgeBrokers("BrokerA", "BrokerB");
-        bridgeBrokers("BrokerB", "BrokerC");
-
-        startAllBrokers();
-
-        // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
-
-        // Setup consumers
-        Session ses = createSession("BrokerA");
-        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
-        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
-
-        // let consumers propagate around the network
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-
-        sendMessages("BrokerC", dest, 1);
-        assertNotNull(clientA.receive(1000));
-        assertNotNull(clientB.receive(1000));
-
-        //bring online a consumer on the other side
-        Session ses2 = createSession("BrokerC");
-        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
-        //there will be 2 network durables, 1 for each direction of the bridge
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
-
-        clientA.close();
-        clientB.close();
-        clientC.close();
-        ses.unsubscribe("subA");
-        ses.unsubscribe("subB");
-        ses2.unsubscribe("subC");
-
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
-
-    }
-
-    public void testForceDurablePropagation() throws Exception {
-        // Setup broker networks
-        bridgeBrokers("BrokerA", "BrokerB");
-        bridgeBrokers("BrokerB", "BrokerC");
-
-        startAllBrokers();
-
-        // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
-
-        // Setup consumers
-        Session ses = createSession("BrokerA");
-        MessageConsumer clientA = ses.createConsumer(dest);
-
-        // let consumers propagate around the network
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-
-        sendMessages("BrokerC", dest, 1);
-        assertNotNull(clientA.receive(1000));
-
-        Session ses2 = createSession("BrokerC");
-        MessageConsumer clientC = ses2.createConsumer(dest);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
-
-        clientA.close();
-        clientC.close();
-
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
-    }
-
-    public void testDurablePropagationSync() throws Exception {
-        // Setup broker networks
-        NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB");
-        NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerC");
-
-        startAllBrokers();
-
-        nc1.stop();
-        nc2.stop();
-
-        // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
-
-        // Setup consumers
-        Session ses = createSession("BrokerA");
-        Session ses2 = createSession("BrokerC");
-        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
-        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
-        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
-
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
-
-        nc1.start();
-        nc2.start();
-
-        //there will be 2 network durables, 1 for each direction of the bridge
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
-
-        clientA.close();
-        clientB.close();
-        clientC.close();
-    }
-
-
-    protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest,
-            final int count) throws Exception {
-        assertTrue(Wait.waitFor(new Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return count == getNCDurableSubs(brokerService, dest).size();
-            }
-        }, 10000, 500));
-    }
-
-    protected List<DurableTopicSubscription> getNCDurableSubs(final BrokerService brokerService,
-            final ActiveMQTopic dest) throws Exception {
-        List<DurableTopicSubscription> subs = new ArrayList<>();
-        Destination d = brokerService.getDestination(dest);
-        org.apache.activemq.broker.region.Topic destination = null;
-        if (d instanceof DestinationFilter) {
-            destination = ((DestinationFilter) d).getAdaptor(org.apache.activemq.broker.region.Topic.class);
-        } else {
-            destination = (org.apache.activemq.broker.region.Topic) d;
-        }
-
-        for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
-            if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) {
-                DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
-                if (sub != null) {
-                    subs.add(sub);
-                }
-            }
-        }
-
-        return subs;
-    }
-
-    @Override
-    public void setUp() throws Exception {
-        super.setAutoFail(true);
-        super.setUp();
-        String options = new String("?persistent=false&useJmx=false");
-        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
-        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
-        createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
-    }
-
-    @Override
-    protected void configureBroker(BrokerService broker) {
-        broker.setBrokerId(broker.getBrokerName());
-    }
-
-    protected Session createSession(String broker) throws Exception {
-        Connection con = createConnection(broker);
-        con.start();
-        return con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-    }
-
-    public static Test suite() {
-        return suite(DurableThreeBrokerNetworkBridgeTest.class);
-    }
-}


[6/6] activemq git commit: (AMQ-6858) - add extra precautionary check for demand subs

Posted by cs...@apache.org.
(AMQ-6858) - add extra precautionary check for demand subs

also remove test that intermittently fails with timing issues

(cherry picked from commit 08aa5118ffcd8dfc4ec813d799a61ef1ee4421d1)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1cfc9ff9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1cfc9ff9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1cfc9ff9

Branch: refs/heads/activemq-5.15.x
Commit: 1cfc9ff9a6d0682bd8fccf278924d15a908ccb31
Parents: 3285be6
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Thu Nov 16 07:44:33 2017 -0500
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Nov 16 07:50:53 2017 -0500

----------------------------------------------------------------------
 .../apache/activemq/network/ConduitBridge.java  |  3 ++-
 .../network/DemandForwardingBridgeSupport.java  |  3 ++-
 .../DurableFiveBrokerNetworkBridgeTest.java     | 23 +-------------------
 3 files changed, 5 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/1cfc9ff9/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
index 70f45f7..a4b5072 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
@@ -83,7 +83,8 @@ public class ConduitBridge extends DemandForwardingBridge {
                 } else {
                     //Handle the demand generated by proxy network subscriptions
                     //The broker path is case is normal
-                    if (isProxyNSConsumerBrokerPath(info)) {
+                    if (isProxyNSConsumerBrokerPath(info) &&
+                            info.getSubscriptionName() != null && info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) {
                         final BrokerId[] path = info.getBrokerPath();
                         addProxyNetworkSubscriptionBrokerPath(ds, path, info.getSubscriptionName());
                     //This is the durable sync case on broker restart

http://git-wip-us.apache.org/repos/asf/activemq/blob/1cfc9ff9/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index df493c3..dd7716f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -1432,7 +1432,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 if (consumerInfo.isDurable()) {
                     //Handle the demand generated by proxy network subscriptions
                     //The broker path is case is normal
-                    if (isProxyNSConsumerBrokerPath(sub.getRemoteInfo())) {
+                    if (isProxyNSConsumerBrokerPath(sub.getRemoteInfo()) &&
+                            info.getSubscriptionName() != null && info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) {
                         final BrokerId[] path = info.getBrokerPath();
                         addProxyNetworkSubscriptionBrokerPath(sub, path, consumerInfo.getSubscriptionName());
                     //This is the durable sync case on broker restart

http://git-wip-us.apache.org/repos/asf/activemq/blob/1cfc9ff9/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
index 5bbd8b8..fe07ac3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -63,30 +63,15 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         return connector;
     }
 
-    public void testDurablePropagationBrokerRestartDuplex() throws Exception {
+    public void testDurablePropagationBrokerRestart() throws Exception {
         duplex = true;
-        testDurablePropagationBrokerRestart();
-    }
 
-    public void testDurablePropagationBrokerRestartOneWay() throws Exception {
-        duplex = false;
-        testDurablePropagationBrokerRestart();
-    }
-
-    protected void testDurablePropagationBrokerRestart() throws Exception {
         // Setup broker networks
         bridgeBrokers("Broker_A_A", "Broker_B_B");
         bridgeBrokers("Broker_B_B", "Broker_C_C");
         bridgeBrokers("Broker_C_C", "Broker_D_D");
         bridgeBrokers("Broker_D_D", "Broker_E_E");
 
-        if (!duplex) {
-            bridgeBrokers("Broker_B_B", "Broker_A_A");
-            bridgeBrokers("Broker_C_C", "Broker_B_B");
-            bridgeBrokers("Broker_D_D", "Broker_C_C");
-            bridgeBrokers("Broker_E_E", "Broker_D_D");
-        }
-
         startAllBrokers();
 
         // Setup destination
@@ -139,12 +124,6 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         bridgeBrokers("Broker_B_B", "Broker_C_C");
         bridgeBrokers("Broker_C_C", "Broker_D_D");
         bridgeBrokers("Broker_D_D", "Broker_E_E");
-        if (!duplex) {
-            bridgeBrokers("Broker_B_B", "Broker_A_A");
-            bridgeBrokers("Broker_C_C", "Broker_B_B");
-            bridgeBrokers("Broker_D_D", "Broker_C_C");
-            bridgeBrokers("Broker_E_E", "Broker_D_D");
-        }
 
         startAllBrokers();