You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2017/11/09 16:23:14 UTC
activemq git commit: AMQ-6858 - Fix several durable subscription
bridge propagation issues
Repository: activemq
Updated Branches:
refs/heads/master 7dad09a9c -> 96ce14b27
AMQ-6858 - Fix several durable subscription bridge propagation issues
Durable network proxy subs will now be properly created across multiple
bridges when 3 or more brokers are used. Demand will be properly synced
and removed.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/96ce14b2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/96ce14b2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/96ce14b2
Branch: refs/heads/master
Commit: 96ce14b278fef9e5f428f1c3c07ce5c09fd8f9a8
Parents: 7dad09a
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Mon Nov 6 08:42:03 2017 -0500
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Nov 9 11:21:18 2017 -0500
----------------------------------------------------------------------
.../apache/activemq/network/ConduitBridge.java | 16 +-
.../network/DemandForwardingBridgeSupport.java | 49 +++-
.../activemq/network/DurableConduitBridge.java | 19 +-
.../network/NetworkBridgeConfiguration.java | 13 +
.../DurableThreeBrokerNetworkBridgeTest.java | 241 +++++++++++++++++++
.../VerifyNetworkConsumersDisconnectTest.java | 68 +++++-
6 files changed, 384 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
index 3c0b85b..6ced896 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
@@ -56,12 +56,16 @@ public class ConduitBridge extends DemandForwardingBridge {
}
protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info, boolean isForcedDurable) {
- // search through existing subscriptions and see if we have a match
- if (info.isNetworkSubscription()) {
+ //If a network subscription and a queue check if isConduitNetworkQueueSubscriptions is true
+ //If true then we want to try and conduit
+ //For topics we always want to conduit regardless of network subscription or not
+ if (info.isNetworkSubscription() && info.getDestination().isQueue() &&
+ !configuration.isConduitNetworkQueueSubscriptions()) {
return false;
}
boolean matched = false;
+ // search through existing subscriptions and see if we have a match
for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination());
if (canConduit(ds) && filter.matches(info.getDestination())) {
@@ -86,9 +90,13 @@ public class ConduitBridge extends DemandForwardingBridge {
}
// we want to conduit statically included consumers which are local networkSubs
- // but we don't want to conduit remote network subs i.e. (proxy proxy) consumers
+ // but we don't want to conduit remote network queue subs i.e. (proxy proxy) consumers
+ // unless isConduitNetworkQueueSubscriptions is true
+ // We always want to conduit topic subscriptions
private boolean canConduit(DemandSubscription ds) {
- return ds.isStaticallyIncluded() || !ds.getRemoteInfo().isNetworkSubscription();
+ return ds.isStaticallyIncluded() || ds.getRemoteInfo().getDestination().isTopic() ||
+ !ds.getRemoteInfo().isNetworkSubscription() ||
+ (ds.getRemoteInfo().getDestination().isQueue() && configuration.isConduitNetworkQueueSubscriptions());
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 879ab39..efdfa5a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -661,6 +661,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
+ /**
+ * Checks whether or not this consumer is a direct bridge network subscription
+ * @param info
+ * @return
+ */
+ protected boolean isBridgeNS(ConsumerInfo info) {
+ return (info.getSubscriptionName() != null && info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
+ (info.getClientId() == null || info.getClientId().startsWith(configuration.getName()));
+ }
+
protected void serviceRemoteCommand(Command command) {
if (!disposed.get()) {
try {
@@ -694,7 +704,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
//re-add any process any non-NC consumers that match the
//dynamicallyIncludedDestinations list
- if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
+ //Also re-add network consumers that are not part of this direct
+ //bridge (proxy of proxy bridges)
+ if((info.getSubscriptionName() == null || !isBridgeNS(info)) &&
NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
serviceRemoteConsumerAdvisory(info);
}
@@ -986,7 +998,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
localBroker.oneway(sending);
//remove subscriber from map
- i.remove();
+ if (i != null) {
+ i.remove();
+ }
}
}
@@ -1072,7 +1086,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
- // continue removal in separate thread to free up this thread for outstanding responses
+ // continue removal in separate thread to free up tshis thread for outstanding responses
// Serialize with removeDestination operations so that removeSubs are serialized with
// removeDestinations such that all removeSub advisories are generated
serialExecutor.execute(new Runnable() {
@@ -1080,7 +1094,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
public void run() {
sub.waitForCompletion();
try {
- localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
+ //If removing a network durable subscription that still has durable remote subs
+ //make sure we cleanup the durable subscription properly - necessary when using
+ //durable subscriptions and 3 or more brokers
+ if (configuration.isConduitSubscriptions() &&
+ sub.getLocalInfo().getSubscriptionName() != null &&
+ sub.getLocalInfo().getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) &&
+ sub.getDurableRemoteSubs().size() > 0) {
+ sub.getDurableRemoteSubs().clear();
+ cleanupDurableSub(sub, null);
+ } else {
+ localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
+ }
} catch (IOException e) {
LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
}
@@ -1315,13 +1340,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
for (ActiveMQDestination dest : dests) {
if (isPermissableDestination(dest)) {
DemandSubscription sub = createDemandSubscription(dest, null);
- sub.setStaticallyIncluded(true);
- try {
- addSubscription(sub);
- } catch (IOException e) {
- LOG.error("Failed to add static destination {}", dest, e);
+ if (sub != null) {
+ sub.setStaticallyIncluded(true);
+ try {
+ addSubscription(sub);
+ } catch (IOException e) {
+ LOG.error("Failed to add static destination {}", dest, e);
+ }
+ LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
+ } else {
+ LOG.info("{}, static destination excluded: {}, demand already exists", configuration.getBrokerName(), dest);
}
- LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
} else {
LOG.info("{}, static destination excluded: {}", configuration.getBrokerName(), dest);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index fb2b6c9..42f30a4 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -75,12 +75,15 @@ public class DurableConduitBridge extends ConduitBridge {
String candidateSubName = getSubscriberName(dest);
for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
String subName = subscription.getConsumerInfo().getSubscriptionName();
- if (subName != null && subName.equals(candidateSubName)) {
+ String clientId = subscription.getContext().getClientId();
+ if (subName != null && subName.equals(candidateSubName) && clientId.startsWith(configuration.getName())) {
DemandSubscription sub = createDemandSubscription(dest, subName);
- sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
- sub.setStaticallyIncluded(true);
- addSubscription(sub);
- break;
+ if (sub != null) {
+ sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
+ sub.setStaticallyIncluded(true);
+ addSubscription(sub);
+ break;
+ }
}
}
}
@@ -139,8 +142,12 @@ public class DurableConduitBridge extends ConduitBridge {
info.setSubscriptionName(getSubscriberName(info.getDestination()));
// and override the consumerId with something unique so that it won't
// be removed if the durable subscriber (at the other end) goes away
- info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
+ //Only do this for direct bridge consumers - proxy network consumers we don't
+ //want to replace the consumerId or cleanup won't happen properly
+ if (info.getBrokerPath().length == 1 || (info.getBrokerPath().length > 1 && info.getBrokerPath()[0] == remoteBrokerPath[0])) {
+ info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
consumerIdGenerator.getNextSequenceId()));
+ }
}
info.setSelector(null);
DemandSubscription demandSubscription = doCreateDemandSubscription(info);
http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index 9e596d4..b2ca78a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -29,6 +29,11 @@ import org.apache.activemq.command.ConsumerInfo;
public class NetworkBridgeConfiguration {
private boolean conduitSubscriptions = true;
+ /**
+ * Whether or not network subscriptions on queues are eligible to be conduit
+ * Default is false
+ */
+ private boolean conduitNetworkQueueSubscriptions;
private boolean useVirtualDestSubs;
private boolean dynamicOnly;
private boolean syncDurableSubs;
@@ -85,6 +90,14 @@ public class NetworkBridgeConfiguration {
this.conduitSubscriptions = conduitSubscriptions;
}
+ public boolean isConduitNetworkQueueSubscriptions() {
+ return conduitNetworkQueueSubscriptions;
+ }
+
+ public void setConduitNetworkQueueSubscriptions(boolean conduitNetworkQueueSubscriptions) {
+ this.conduitNetworkQueueSubscriptions = conduitNetworkQueueSubscriptions;
+ }
+
/**
* @return the dynamicOnly
*/
http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
new file mode 100644
index 0000000..ff09a1c
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFilter;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.SubscriptionKey;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+
+import com.google.common.collect.Lists;
+
+import junit.framework.Test;
+
+/**
+ * Test to make sure durable subscriptions propagate properly throughout network bridges
+ * and that conduit subscriptions work properly
+ */
+public class DurableThreeBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport {
+
+ @Override
+ protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
+ NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName);
+ connector.setDynamicallyIncludedDestinations(
+ Lists.<ActiveMQDestination> newArrayList(new ActiveMQTopic("TEST.FOO?forceDurable=true")));
+ connector.setDuplex(true);
+ connector.setDecreaseNetworkConsumerPriority(false);
+ connector.setConduitSubscriptions(true);
+ connector.setSyncDurableSubs(true);
+ connector.setNetworkTTL(-1);
+ return connector;
+ }
+
+ /**
+ * BrokerA -> BrokerB -> BrokerC
+ */
+ public void testDurablePropagation() throws Exception {
+ // Setup broker networks
+ bridgeBrokers("BrokerA", "BrokerB");
+ bridgeBrokers("BrokerB", "BrokerC");
+
+ startAllBrokers();
+
+ // Setup destination
+ ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+ // Setup consumers
+ Session ses = createSession("BrokerA");
+ MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+ MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
+
+ // let consumers propagate around the network
+ assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+ sendMessages("BrokerC", dest, 1);
+ assertNotNull(clientA.receive(1000));
+ assertNotNull(clientB.receive(1000));
+
+ //bring online a consumer on the other side
+ Session ses2 = createSession("BrokerC");
+ MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
+ //there will be 2 network durables, 1 for each direction of the bridge
+ assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+ clientA.close();
+ clientB.close();
+ clientC.close();
+ ses.unsubscribe("subA");
+ ses.unsubscribe("subB");
+ ses2.unsubscribe("subC");
+
+ assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+
+ }
+
+ public void testForceDurablePropagation() throws Exception {
+ // Setup broker networks
+ bridgeBrokers("BrokerA", "BrokerB");
+ bridgeBrokers("BrokerB", "BrokerC");
+
+ startAllBrokers();
+
+ // Setup destination
+ ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+ // Setup consumers
+ Session ses = createSession("BrokerA");
+ MessageConsumer clientA = ses.createConsumer(dest);
+
+ // let consumers propagate around the network
+ assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+ sendMessages("BrokerC", dest, 1);
+ assertNotNull(clientA.receive(1000));
+
+ Session ses2 = createSession("BrokerC");
+ MessageConsumer clientC = ses2.createConsumer(dest);
+ assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+ clientA.close();
+ clientC.close();
+
+ assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+ }
+
+ public void testDurablePropagationSync() throws Exception {
+ // Setup broker networks
+ NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB");
+ NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerC");
+
+ startAllBrokers();
+
+ nc1.stop();
+ nc2.stop();
+
+ // Setup destination
+ ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+ // Setup consumers
+ Session ses = createSession("BrokerA");
+ Session ses2 = createSession("BrokerC");
+ MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+ MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
+ MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
+
+ assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+
+ nc1.start();
+ nc2.start();
+
+ //there will be 2 network durables, 1 for each direction of the bridge
+ assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+ clientA.close();
+ clientB.close();
+ clientC.close();
+ }
+
+
+ protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest,
+ final int count) throws Exception {
+ assertTrue(Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return count == getNCDurableSubs(brokerService, dest).size();
+ }
+ }, 10000, 500));
+ }
+
+ protected List<DurableTopicSubscription> getNCDurableSubs(final BrokerService brokerService,
+ final ActiveMQTopic dest) throws Exception {
+ List<DurableTopicSubscription> subs = new ArrayList<>();
+ Destination d = brokerService.getDestination(dest);
+ org.apache.activemq.broker.region.Topic destination = null;
+ if (d instanceof DestinationFilter) {
+ destination = ((DestinationFilter) d).getAdaptor(org.apache.activemq.broker.region.Topic.class);
+ } else {
+ destination = (org.apache.activemq.broker.region.Topic) d;
+ }
+
+ for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
+ if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) {
+ DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
+ if (sub != null) {
+ subs.add(sub);
+ }
+ }
+ }
+
+ return subs;
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setAutoFail(true);
+ super.setUp();
+ String options = new String("?persistent=false&useJmx=false");
+ createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
+ createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
+ createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
+ }
+
+ @Override
+ protected void configureBroker(BrokerService broker) {
+ broker.setBrokerId(broker.getBrokerName());
+ }
+
+ protected Session createSession(String broker) throws Exception {
+ Connection con = createConnection(broker);
+ con.start();
+ return con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public static Test suite() {
+ return suite(DurableThreeBrokerNetworkBridgeTest.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
index 9eeb28c..d04e1f5 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
@@ -61,14 +61,19 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
unhandledExceptions.isEmpty());
}
- public NetworkConnector bridge(String from, String to) throws Exception {
+ public NetworkConnector bridge(String from, String to, boolean conduitNetworkQueueSubscriptions) throws Exception {
NetworkConnector networkConnector = bridgeBrokers(from, to, true, NETWORK_TTL, CONDUIT);
networkConnector.setSuppressDuplicateQueueSubscriptions(true);
networkConnector.setDecreaseNetworkConsumerPriority(true);
networkConnector.setDuplex(DUPLEX);
+ networkConnector.setConduitNetworkQueueSubscriptions(conduitNetworkQueueSubscriptions);
return networkConnector;
}
+ public NetworkConnector bridge(String from, String to) throws Exception {
+ return bridge(from, to, false);
+ }
+
/*why conduit proxy proxy consumers gets us in a knot w.r.t removal
DC-7 for CA-9, add DB-15, remove CA-9, add CB-8
CB-8 add DC-7
@@ -137,6 +142,63 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
}
+ public void testConsumerOnEachBrokerNetworkQueueConduitSubs() throws Exception {
+ bridge("Broker0", "Broker1", true);
+ if (!DUPLEX) bridge("Broker1", "Broker0", true);
+
+ bridge("Broker1", "Broker2", true);
+ if (!DUPLEX) bridge("Broker2", "Broker1", true);
+
+ startAllBrokers();
+ waitForBridgeFormation(brokers.get("Broker0").broker, 1, 0);
+ waitForBridgeFormation(brokers.get("Broker2").broker, 1, 0);
+ waitForBridgeFormation(brokers.get("Broker1").broker, 1, 0);
+ waitForBridgeFormation(brokers.get("Broker1").broker, 1, 1);
+
+ Destination dest = createDestination("TEST.FOO", false);
+
+ // Setup consumers
+ for (int i = 0; i < BROKER_COUNT; i++) {
+ consumerMap.put("Consumer:" + i + ":0", createConsumer("Broker" + i, dest));
+ }
+
+ //Conduit network queue conduit subs is true so should only be 2 subs
+ assertExactConsumersConnect("Broker0", 2, 1, TIMEOUT);
+ assertExactConsumersConnect("Broker2", 2, 1, TIMEOUT);
+ // still should be 3 subs for the middle broker, 1 for each direction
+ assertExactConsumersConnect("Broker1", 3, 1, TIMEOUT);
+
+ assertNoUnhandledExceptions();
+
+ LOG.info("Complete the mesh - 0->2");
+
+ // shorter route
+ NetworkConnector nc = bridge("Broker0", "Broker2");
+ nc.setBrokerName("Broker0");
+ nc.start();
+
+
+ if (!DUPLEX) {
+ LOG.info("... complete the mesh - 2->0");
+ nc = bridge("Broker2", "Broker0");
+ nc.setBrokerName("Broker2");
+ nc.start();
+ }
+
+ // reverse order close
+ consumerMap.get("Consumer:" + 2 + ":0").close();
+ TimeUnit.SECONDS.sleep(1);
+ consumerMap.get("Consumer:" + 1 + ":0").close();
+ TimeUnit.SECONDS.sleep(1);
+ consumerMap.get("Consumer:" + 0 + ":0").close();
+
+ LOG.info("Check for no consumers..");
+ for (int i = 0; i < BROKER_COUNT; i++) {
+ assertExactConsumersConnect("Broker" + i, 0, 0, TIMEOUT);
+ }
+
+ }
+
public void testXConsumerOnEachBroker() throws Exception {
bridge("Broker0", "Broker1");
if (!DUPLEX) bridge("Broker1", "Broker0");
@@ -232,13 +294,14 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
}, timeout));
}
+ @Override
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
unhandledExceptions.clear();
Thread.setDefaultUncaughtExceptionHandler(this);
-
+
// Setup n brokers
for (int i = 0; i < BROKER_COUNT; i++) {
createBroker(new URI("broker:(tcp://localhost:6161" + i + ")/Broker" + i + "?persistent=false&useJmx=true&brokerId=Broker" + i));
@@ -256,6 +319,7 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
brokerService.setDestinationPolicy(policyMap);
}
+ @Override
public void uncaughtException(Thread t, Throwable e) {
synchronized(unhandledExceptions) {
unhandledExceptions.put(t, e);