You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2017/11/14 20:27:34 UTC
activemq git commit: AMQ-6858 - Allow configuration of the client Id
token in network bridge
Repository: activemq
Updated Branches:
refs/heads/master 41211c78d -> 3ca439cad
AMQ-6858 - Allow configuration of the client Id token in network bridge
The client id token that is used to separate parts of a generated local
client id in a network bridge should be configurable so bridge names and
broker names can contain underscores if desired
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3ca439ca
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3ca439ca
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3ca439ca
Branch: refs/heads/master
Commit: 3ca439cadaa8c40f506aa1ef683b36624d216254
Parents: 41211c7
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Nov 14 14:26:22 2017 -0500
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Nov 14 15:23:26 2017 -0500
----------------------------------------------------------------------
.../network/DemandForwardingBridgeSupport.java | 25 +-
.../network/NetworkBridgeConfiguration.java | 9 +
.../DurableFiveBrokerNetworkBridgeTest.java | 293 ++++++++++---------
3 files changed, 167 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/3ca439ca/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 03e79e4..75084d1 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
import javax.management.ObjectName;
@@ -492,7 +493,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
localConnectionInfo = new ConnectionInfo();
localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
- localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
+ localClientId = configuration.getName() + configuration.getClientIdToken() + remoteBrokerName + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + configuration.getBrokerName();
localConnectionInfo.setClientId(localClientId);
localConnectionInfo.setUserName(configuration.getUserName());
localConnectionInfo.setPassword(configuration.getPassword());
@@ -520,8 +521,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
- duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
- + configuration.getBrokerName());
+ duplexLocalConnectionInfo.setClientId(configuration.getName() + configuration.getClientIdToken() + remoteBrokerName + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + "duplex"
+ + configuration.getClientIdToken() + configuration.getBrokerName());
duplexLocalConnectionInfo.setUserName(configuration.getUserName());
duplexLocalConnectionInfo.setPassword(configuration.getPassword());
@@ -609,7 +610,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
remoteConnectionInfo = new ConnectionInfo();
remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
- remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
+ remoteConnectionInfo.setClientId(configuration.getName() + configuration.getClientIdToken() + configuration.getBrokerName() + configuration.getClientIdToken() + "outbound");
remoteConnectionInfo.setUserName(configuration.getUserName());
remoteConnectionInfo.setPassword(configuration.getPassword());
remoteBroker.oneway(remoteConnectionInfo);
@@ -685,7 +686,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (sub != null && path.length > 1 && subName != null) {
String b1 = path[path.length-1].toString();
String b2 = path[path.length-2].toString();
- final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + "_inbound_" + b1, subName);
+ final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + b1, subName);
sub.getDurableRemoteSubs().add(newSubInfo);
sub.getNetworkDemandConsumerMap().computeIfAbsent(newSubInfo, v -> new AtomicInteger()).incrementAndGet();
LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo);
@@ -695,15 +696,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
private String getProxyBridgeClientId(SubscriptionInfo info) {
- String[] clientIdTokens = info.getClientId().split("_");
- String newClientId = "";
- if (clientIdTokens.length > 2) {
- for (int j = clientIdTokens.length - 3; j < clientIdTokens.length; j++) {
- newClientId += clientIdTokens[j];
- if (j < clientIdTokens.length -1) {
- newClientId += "_";
- }
- }
+ String newClientId = info.getClientId();
+ String[] clientIdTokens = newClientId != null ? newClientId.split(Pattern.quote(configuration.getClientIdToken())) : null;
+ if (clientIdTokens != null && clientIdTokens.length > 2) {
+ newClientId = clientIdTokens[clientIdTokens.length - 3] + configuration.getClientIdToken() + "inbound"
+ + configuration.getClientIdToken() + clientIdTokens[clientIdTokens.length -1];
}
return newClientId;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3ca439ca/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index b2ca78a..3c64758 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -59,6 +59,7 @@ public class NetworkBridgeConfiguration {
private String password;
private String destinationFilter = null;
private String name = "NC";
+ private String clientIdToken = "_";
protected List<ActiveMQDestination> excludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
protected List<ActiveMQDestination> dynamicallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
@@ -191,6 +192,14 @@ public class NetworkBridgeConfiguration {
this.brokerName = brokerName;
}
+ public String getClientIdToken() {
+ return clientIdToken;
+ }
+
+ public void setClientIdToken(String clientIdToken) {
+ this.clientIdToken = clientIdToken;
+ }
+
/**
* @return the networkTTL
*/
http://git-wip-us.apache.org/repos/asf/activemq/blob/3ca439ca/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
index 4a63553..94d7393 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -57,6 +57,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
connector.setConduitSubscriptions(true);
connector.setSyncDurableSubs(true);
connector.setNetworkTTL(-1);
+ connector.setClientIdToken("|");
return connector;
}
@@ -71,15 +72,15 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
}
/**
- * BrokerA -> BrokerB -> BrokerC
+ * Broker_A_A -> Broker_B_B -> Broker_C_C
*/
protected void testDurablePropagation() throws Exception {
// Setup broker networks
- bridgeBrokers("BrokerA", "BrokerB");
- bridgeBrokers("BrokerB", "BrokerC");
+ bridgeBrokers("Broker_A_A", "Broker_B_B");
+ bridgeBrokers("Broker_B_B", "Broker_C_C");
if (!duplex) {
- bridgeBrokers("BrokerB", "BrokerA");
- bridgeBrokers("BrokerC", "BrokerB");
+ bridgeBrokers("Broker_B_B", "Broker_A_A");
+ bridgeBrokers("Broker_C_C", "Broker_B_B");
}
startAllBrokers();
@@ -88,26 +89,26 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
// Setup consumers
- Session ses = createSession("BrokerA");
+ Session ses = createSession("Broker_A_A");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
// let consumers propagate around the network
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
- sendMessages("BrokerC", dest, 1);
+ sendMessages("Broker_C_C", dest, 1);
assertNotNull(clientA.receive(1000));
assertNotNull(clientB.receive(1000));
//bring online a consumer on the other side
- Session ses2 = createSession("BrokerC");
+ Session ses2 = createSession("Broker_C_C");
MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
//there will be 2 network durables, 1 for each direction of the bridge
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
clientA.close();
clientB.close();
@@ -116,9 +117,9 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ses.unsubscribe("subB");
ses2.unsubscribe("subC");
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
}
@@ -134,11 +135,11 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
protected void testDurablePropagationConsumerAllBrokers() throws Exception {
// Setup broker networks
- bridgeBrokers("BrokerA", "BrokerB");
- bridgeBrokers("BrokerB", "BrokerC");
+ bridgeBrokers("Broker_A_A", "Broker_B_B");
+ bridgeBrokers("Broker_B_B", "Broker_C_C");
if (!duplex) {
- bridgeBrokers("BrokerB", "BrokerA");
- bridgeBrokers("BrokerC", "BrokerB");
+ bridgeBrokers("Broker_B_B", "Broker_A_A");
+ bridgeBrokers("Broker_C_C", "Broker_B_B");
}
startAllBrokers();
@@ -147,28 +148,28 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
// Setup consumers
- Session ses = createSession("BrokerA");
+ Session ses = createSession("Broker_A_A");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
// let consumers propagate around the network
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
//bring online a consumer on the other side
- Session ses2 = createSession("BrokerB");
+ Session ses2 = createSession("Broker_B_B");
MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB");
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
- Session ses3 = createSession("BrokerC");
+ Session ses3 = createSession("Broker_C_C");
MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC");
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
clientA.close();
@@ -179,9 +180,9 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ses3.unsubscribe("subC");
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
}
@@ -197,15 +198,15 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
protected void testDurablePropagation5Broker() throws Exception {
// Setup broker networks
- bridgeBrokers("BrokerA", "BrokerB");
- bridgeBrokers("BrokerB", "BrokerC");
- bridgeBrokers("BrokerC", "BrokerD");
- bridgeBrokers("BrokerD", "BrokerE");
+ bridgeBrokers("Broker_A_A", "Broker_B_B");
+ bridgeBrokers("Broker_B_B", "Broker_C_C");
+ bridgeBrokers("Broker_C_C", "Broker_D_D");
+ bridgeBrokers("Broker_D_D", "Broker_E_E");
if (!duplex) {
- bridgeBrokers("BrokerB", "BrokerA");
- bridgeBrokers("BrokerC", "BrokerB");
- bridgeBrokers("BrokerD", "BrokerC");
- bridgeBrokers("BrokerE", "BrokerD");
+ bridgeBrokers("Broker_B_B", "Broker_A_A");
+ bridgeBrokers("Broker_C_C", "Broker_B_B");
+ bridgeBrokers("Broker_D_D", "Broker_C_C");
+ bridgeBrokers("Broker_E_E", "Broker_D_D");
}
startAllBrokers();
@@ -214,42 +215,42 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
// Setup consumers
- Session ses = createSession("BrokerA");
+ Session ses = createSession("Broker_A_A");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
Thread.sleep(1000);
// let consumers propagate around the network
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
- sendMessages("BrokerE", dest, 1);
+ sendMessages("Broker_E_E", dest, 1);
assertNotNull(clientA.receive(1000));
//bring online a consumer on the other side
- Session ses2 = createSession("BrokerE");
+ Session ses2 = createSession("Broker_E_E");
MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
Thread.sleep(1000);
//there will be 2 network durables, 1 for each direction of the bridge
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 2);
- assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 2);
- assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
clientA.close();
clientE.close();
ses.unsubscribe("subA");
ses2.unsubscribe("subE");
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 0);
}
@@ -265,13 +266,13 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
protected void testDurablePropagationSpoke() throws Exception {
// Setup broker networks
- bridgeBrokers("BrokerA", "BrokerB");
- bridgeBrokers("BrokerB", "BrokerC");
- bridgeBrokers("BrokerB", "BrokerD");
+ bridgeBrokers("Broker_A_A", "Broker_B_B");
+ bridgeBrokers("Broker_B_B", "Broker_C_C");
+ bridgeBrokers("Broker_B_B", "Broker_D_D");
if (!duplex) {
- bridgeBrokers("BrokerB", "BrokerA");
- bridgeBrokers("BrokerC", "BrokerB");
- bridgeBrokers("BrokerD", "BrokerB");
+ bridgeBrokers("Broker_B_B", "Broker_A_A");
+ bridgeBrokers("Broker_C_C", "Broker_B_B");
+ bridgeBrokers("Broker_D_D", "Broker_B_B");
}
startAllBrokers();
@@ -280,42 +281,42 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
// Setup consumers
- Session ses = createSession("BrokerA");
- Session ses2 = createSession("BrokerB");
- Session ses3 = createSession("BrokerC");
- Session ses4 = createSession("BrokerD");
+ Session ses = createSession("Broker_A_A");
+ Session ses2 = createSession("Broker_B_B");
+ Session ses3 = createSession("Broker_C_C");
+ Session ses4 = createSession("Broker_D_D");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
MessageConsumer clientAB = ses.createDurableSubscriber(dest, "subAB");
Thread.sleep(1000);
// let consumers propagate around the network
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
MessageConsumer clientD = ses4.createDurableSubscriber(dest, "subD");
Thread.sleep(1000);
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
- sendMessages("BrokerA", dest, 1);
+ sendMessages("Broker_A_A", dest, 1);
assertNotNull(clientD.receive(1000));
- sendMessages("BrokerC", dest, 1);
+ sendMessages("Broker_C_C", dest, 1);
assertNotNull(clientD.receive(1000));
MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB");
MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC");
Thread.sleep(1000);
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 3);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 3);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
clientA.close();
clientAB.close();
@@ -329,10 +330,10 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ses3.unsubscribe("subC");
ses4.unsubscribe("subD");
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 0);
}
public void testForceDurablePropagationDuplex() throws Exception {
@@ -347,11 +348,11 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
protected void testForceDurablePropagation() throws Exception {
// Setup broker networks
- bridgeBrokers("BrokerA", "BrokerB");
- bridgeBrokers("BrokerB", "BrokerC");
+ bridgeBrokers("Broker_A_A", "Broker_B_B");
+ bridgeBrokers("Broker_B_B", "Broker_C_C");
if (!duplex) {
- bridgeBrokers("BrokerB", "BrokerA");
- bridgeBrokers("BrokerC", "BrokerB");
+ bridgeBrokers("Broker_B_B", "Broker_A_A");
+ bridgeBrokers("Broker_C_C", "Broker_B_B");
}
startAllBrokers();
@@ -360,32 +361,32 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
// Setup consumers
- Session ses = createSession("BrokerA");
+ Session ses = createSession("Broker_A_A");
MessageConsumer clientA = ses.createConsumer(dest);
Thread.sleep(1000);
// let consumers propagate around the network
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
- sendMessages("BrokerC", dest, 1);
+ sendMessages("Broker_C_C", dest, 1);
assertNotNull(clientA.receive(1000));
- Session ses2 = createSession("BrokerC");
+ Session ses2 = createSession("Broker_C_C");
MessageConsumer clientC = ses2.createConsumer(dest);
Thread.sleep(1000);
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
clientA.close();
clientC.close();
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
}
public void testDurablePropagationSyncDuplex() throws Exception {
@@ -400,14 +401,14 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
protected void testDurablePropagationSync() throws Exception {
// Setup broker networks
- NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB");
- NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerC");
+ NetworkConnector nc1 = bridgeBrokers("Broker_A_A", "Broker_B_B");
+ NetworkConnector nc2 = bridgeBrokers("Broker_B_B", "Broker_C_C");
NetworkConnector nc3 = null;
NetworkConnector nc4 = null;
if (!duplex) {
- nc3 = bridgeBrokers("BrokerB", "BrokerA");
- nc4 = bridgeBrokers("BrokerC", "BrokerB");
+ nc3 = bridgeBrokers("Broker_B_B", "Broker_A_A");
+ nc4 = bridgeBrokers("Broker_C_C", "Broker_B_B");
}
startAllBrokers();
@@ -424,16 +425,16 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
// Setup consumers
- Session ses = createSession("BrokerA");
- Session ses2 = createSession("BrokerC");
+ Session ses = createSession("Broker_A_A");
+ Session ses2 = createSession("Broker_C_C");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
Thread.sleep(1000);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
nc1.start();
nc2.start();
@@ -443,9 +444,9 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
}
//there will be 2 network durables, 1 for each direction of the bridge
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
clientA.close();
clientB.close();
@@ -456,14 +457,14 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
duplex = true;
// Setup broker networks
- bridgeBrokers("BrokerA", "BrokerB");
- bridgeBrokers("BrokerB", "BrokerC");
+ bridgeBrokers("Broker_A_A", "Broker_B_B");
+ bridgeBrokers("Broker_B_B", "Broker_C_C");
//Duplicate the bridges with different included destinations - valid use case
- NetworkConnector nc3 = bridgeBrokers("BrokerA", "BrokerB");
- NetworkConnector nc4 = bridgeBrokers("BrokerB", "BrokerC");
- nc3.setName("nc3");
- nc4.setName("nc4");
+ NetworkConnector nc3 = bridgeBrokers("Broker_A_A", "Broker_B_B");
+ NetworkConnector nc4 = bridgeBrokers("Broker_B_B", "Broker_C_C");
+ nc3.setName("nc_3_3");
+ nc4.setName("nc_4_4");
nc3.setDynamicallyIncludedDestinations(
Lists.<ActiveMQDestination> newArrayList(new ActiveMQTopic("TEST.FOO2?forceDurable=true")));
nc4.setDynamicallyIncludedDestinations(
@@ -476,8 +477,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ActiveMQTopic dest2 = (ActiveMQTopic) createDestination("TEST.FOO2", true);
// Setup consumers
- Session ses = createSession("BrokerA");
- Session ses2 = createSession("BrokerC");
+ Session ses = createSession("Broker_A_A");
+ Session ses2 = createSession("Broker_C_C");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
MessageConsumer clientAa = ses.createDurableSubscriber(dest2, "subAa");
MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
@@ -485,33 +486,33 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
Thread.sleep(1000);
//make sure network durables are online
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 2);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 1);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest2, 2);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest2, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest2, 1);
clientA.close();
clientC.close();
ses.unsubscribe("subA");
ses2.unsubscribe("subC");
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 2);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 1);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest2, 2);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest2, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest2, 1);
clientAa.close();
clientCc.close();
ses.unsubscribe("subAa");
ses2.unsubscribe("subCc");
- assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 0);
- assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 0);
- assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest2, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest2, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest2, 0);
}
protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest,
@@ -552,11 +553,11 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
super.setAutoFail(true);
super.setUp();
String options = new String("?persistent=false&useJmx=false");
- createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
- createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
- createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
- createBroker(new URI("broker:(tcp://localhost:61619)/BrokerD" + options));
- createBroker(new URI("broker:(tcp://localhost:61620)/BrokerE" + options));
+ createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options));
+ createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options));
+ createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + options));
+ createBroker(new URI("broker:(tcp://localhost:61619)/Broker_D_D" + options));
+ createBroker(new URI("broker:(tcp://localhost:61620)/Broker_E_E" + options));
}
@Override