You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/08/01 14:03:42 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6383
https://issues.apache.org/jira/browse/AMQ-6373
Repository: activemq
Updated Branches:
refs/heads/master 71bb54f94 -> e73ab3483
https://issues.apache.org/jira/browse/AMQ-6383
https://issues.apache.org/jira/browse/AMQ-6373
Adding a new flag that can be specified on a network bridge to allow
forcing of subscriptions to be durable. Cleaned up some unit tests.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e73ab348
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e73ab348
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e73ab348
Branch: refs/heads/master
Commit: e73ab348375db71116624326d29398e0088d7334
Parents: 71bb54f
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Sat Jul 30 10:58:29 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Mon Aug 1 10:02:56 2016 -0400
----------------------------------------------------------------------
.../activemq/broker/TransportConnection.java | 1 +
.../apache/activemq/network/ConduitBridge.java | 8 +-
.../network/DemandForwardingBridgeSupport.java | 55 ++-
.../activemq/network/DemandSubscription.java | 13 +
.../activemq/network/DurableConduitBridge.java | 51 ++-
...ingToListOfActiveMQDestinationConverter.java | 18 +-
.../network/DurableSyncNetworkBridgeTest.java | 104 +++---
.../network/DynamicNetworkTestSupport.java | 66 +++-
.../network/ForceDurableNetworkBridgeTest.java | 351 +++++++++++++++++++
.../network/VirtualConsumerDemandTest.java | 38 +-
10 files changed, 614 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 5288dc9..a05ba8c 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -103,6 +103,7 @@ import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
index 10df1f9..3c0b85b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
@@ -46,7 +46,7 @@ public class ConduitBridge extends DemandForwardingBridge {
@Override
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
- if (addToAlreadyInterestedConsumers(info)) {
+ if (addToAlreadyInterestedConsumers(info, false)) {
return null; // don't want this subscription added
}
//add our original id to ourselves
@@ -55,7 +55,7 @@ public class ConduitBridge extends DemandForwardingBridge {
return doCreateDemandSubscription(info);
}
- protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) {
+ protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info, boolean isForcedDurable) {
// search through existing subscriptions and see if we have a match
if (info.isNetworkSubscription()) {
return false;
@@ -71,6 +71,10 @@ public class ConduitBridge extends DemandForwardingBridge {
// add the interest in the subscription
if (!info.isDurable()) {
ds.add(info.getConsumerId());
+ if (isForcedDurable) {
+ forcedDurableRemoteId.add(info.getConsumerId());
+ ds.addForcedDurableConsumer(info.getConsumerId());
+ }
} else {
ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 865e60e..8a3a56a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -21,9 +21,11 @@ import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -99,6 +101,7 @@ import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,6 +134,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected ActiveMQDestination[] durableDestinations;
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<>();
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<>();
+ protected final Set<ConsumerId> forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<ConsumerId, Boolean>());
protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
protected final CountDownLatch startedLatch = new CountDownLatch(2);
protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
@@ -549,6 +553,21 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// set our properties
Properties props = new Properties();
IntrospectionSupport.getProperties(configuration, props, null);
+
+ String dynamicallyIncludedDestinationsKey = "dynamicallyIncludedDestinations";
+ String staticallyIncludedDestinationsKey = "staticallyIncludedDestinations";
+
+ if (!configuration.getDynamicallyIncludedDestinations().isEmpty()) {
+ props.put(dynamicallyIncludedDestinationsKey,
+ StringToListOfActiveMQDestinationConverter.
+ convertFromActiveMQDestination(configuration.getDynamicallyIncludedDestinations(), true));
+ }
+ if (!configuration.getStaticallyIncludedDestinations().isEmpty()) {
+ props.put(staticallyIncludedDestinationsKey,
+ StringToListOfActiveMQDestinationConverter.
+ convertFromActiveMQDestination(configuration.getStaticallyIncludedDestinations(), true));
+ }
+
props.remove("networkTTL");
String str = MarshallingSupport.propertiesToString(props);
brokerInfo.setNetworkProperties(str);
@@ -858,6 +877,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else if (data.getClass() == RemoveInfo.class) {
ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
removeDemandSubscription(id);
+
+ if (forcedDurableRemoteId.remove(id)) {
+ for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
+ DemandSubscription ds = i.next();
+ boolean removed = ds.removeForcedDurableConsumer(id);
+ if (removed) {
+ cleanupDurableSub(ds, i);
+ }
+ }
+ }
+
} else if (data.getClass() == RemoveSubscriptionInfo.class) {
RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
@@ -873,7 +903,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
private void cleanupDurableSub(final DemandSubscription ds,
Iterator<DemandSubscription> i) throws IOException {
- if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()) {
+ if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()
+ && ds.getForcedDurableConsumersSize() == 0) {
// deactivate subscriber
RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
@@ -1196,7 +1227,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
dests = dynamicallyIncludedDestinations;
if (dests != null && dests.length > 0) {
- return matchesDynamicallyIncludedDestinations(destination);
+ for (ActiveMQDestination dest : dests) {
+ DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
+ if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
+ return true;
+ }
+ }
+
+ return false;
}
return true;
@@ -1216,6 +1254,19 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return false;
}
+ protected ActiveMQDestination findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination destination) {
+ if (dests != null && dests.length > 0) {
+ for (ActiveMQDestination dest : dests) {
+ DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
+ if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
+ return dest;
+ }
+ }
+ }
+
+ return null;
+ }
+
/**
* Subscriptions for these destinations are always created
*/
http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
index b53646a..371df0a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
@@ -41,6 +41,7 @@ public class DemandSubscription {
private final AtomicInteger dispatched = new AtomicInteger(0);
private final AtomicBoolean activeWaiter = new AtomicBoolean();
private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>();
+ private final Set<ConsumerId> forcedDurableConsumers = new CopyOnWriteArraySet<ConsumerId>();
private SubscriptionInfo localDurableSubscriber;
private NetworkBridgeFilter networkBridgeFilter;
@@ -106,6 +107,18 @@ public class DemandSubscription {
return remoteInfo;
}
+ public boolean addForcedDurableConsumer(ConsumerId id) {
+ return forcedDurableConsumers.add(id);
+ }
+
+ public boolean removeForcedDurableConsumer(ConsumerId id) {
+ return forcedDurableConsumers.remove(id);
+ }
+
+ public int getForcedDurableConsumersSize() {
+ return forcedDurableConsumers.size();
+ }
+
public void waitForCompletion() {
if (dispatched.get() > 0) {
LOG.debug("Waiting for completion for sub: {}, dispatched: {}", localInfo.getConsumerId(), this.dispatched.get());
http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index 237e272..e699272 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -17,7 +17,9 @@
package org.apache.activemq.network;
import java.io.IOException;
+import java.util.Map;
+import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
@@ -26,6 +28,7 @@ import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.TypeConversionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,13 +95,16 @@ public class DurableConduitBridge extends ConduitBridge {
@Override
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
- if (addToAlreadyInterestedConsumers(info)) {
+ boolean isForcedDurable = isForcedDurable(info);
+
+ if (addToAlreadyInterestedConsumers(info, isForcedDurable)) {
return null; // don't want this subscription added
}
//add our original id to ourselves
info.addNetworkConsumerId(info.getConsumerId());
+ ConsumerId forcedDurableId = isForcedDurable ? info.getConsumerId() : null;
- if (info.isDurable()) {
+ if(info.isDurable() || isForcedDurable) {
// set the subscriber name to something reproducible
info.setSubscriptionName(getSubscriberName(info.getDestination()));
// and override the consumerId with something unique so that it won't
@@ -107,7 +113,46 @@ public class DurableConduitBridge extends ConduitBridge {
consumerIdGenerator.getNextSequenceId()));
}
info.setSelector(null);
- return doCreateDemandSubscription(info);
+ DemandSubscription demandSubscription = doCreateDemandSubscription(info);
+ if (forcedDurableId != null) {
+ demandSubscription.addForcedDurableConsumer(forcedDurableId);
+ forcedDurableRemoteId.add(forcedDurableId);
+ }
+ return demandSubscription;
+ }
+
+
+ private boolean isForcedDurable(ConsumerInfo info) {
+ if (info.isDurable()) {
+ return false;
+ }
+
+ ActiveMQDestination destination = info.getDestination();
+ if (AdvisorySupport.isAdvisoryTopic(destination) || destination.isTemporary() ||
+ destination.isQueue()) {
+ return false;
+ }
+
+ ActiveMQDestination matching = findMatchingDestination(dynamicallyIncludedDestinations, destination);
+ if (matching != null) {
+ return isDestForcedDurable(matching);
+ }
+ matching = findMatchingDestination(staticallyIncludedDestinations, destination);
+ if (matching != null) {
+ return isDestForcedDurable(matching);
+ }
+ return false;
+ }
+
+ private boolean isDestForcedDurable(ActiveMQDestination destination) {
+ final Map<String, String> options = destination.getOptions();
+
+ boolean isForceDurable = false;
+ if (options != null) {
+ isForceDurable = (boolean) TypeConversionSupport.convert(options.get("forceDurable"), boolean.class);
+ }
+
+ return isForceDurable;
}
protected String getSubscriberName(ActiveMQDestination dest) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-client/src/main/java/org/apache/activemq/util/StringToListOfActiveMQDestinationConverter.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/StringToListOfActiveMQDestinationConverter.java b/activemq-client/src/main/java/org/apache/activemq/util/StringToListOfActiveMQDestinationConverter.java
index 7f58f20..130afa7 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/StringToListOfActiveMQDestinationConverter.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/StringToListOfActiveMQDestinationConverter.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.util;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
@@ -59,6 +61,10 @@ public class StringToListOfActiveMQDestinationConverter {
}
public static String convertFromActiveMQDestination(Object value) {
+ return convertFromActiveMQDestination(value, false);
+ }
+
+ public static String convertFromActiveMQDestination(Object value, boolean includeOptions) {
if (value == null) {
return null;
}
@@ -70,7 +76,17 @@ public class StringToListOfActiveMQDestinationConverter {
Object e = list.get(i);
if (e instanceof ActiveMQDestination) {
ActiveMQDestination destination = (ActiveMQDestination) e;
- sb.append(destination);
+ if (includeOptions && destination.getOptions() != null) {
+ try {
+ //Reapply the options as URI parameters
+ sb.append(destination.toString() + URISupport.applyParameters(
+ new URI(""), destination.getOptions()));
+ } catch (URISyntaxException e1) {
+ sb.append(destination);
+ }
+ } else {
+ sb.append(destination);
+ }
if (i < list.size() - 1) {
sb.append(", ");
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index 1390d05..3c4a2a0 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.network;
-import static org.junit.Assert.assertTrue;
-
import java.io.File;
import java.net.URI;
import java.util.Arrays;
@@ -30,12 +28,10 @@ import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.CommandTypes;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
@@ -98,12 +94,12 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
sub1.close();
assertSubscriptionsCount(broker1, topic, 1);
- assertNCSubscriptionsCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, topic, 1);
removeSubscription(broker1, topic, subName);
assertSubscriptionsCount(broker1, topic, 0);
- assertNCSubscriptionsCount(broker2, topic, 0);
+ assertNCDurableSubsCount(broker2, topic, 0);
}
@@ -114,17 +110,17 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
sub1.close();
assertSubscriptionsCount(broker1, topic, 1);
- assertNCSubscriptionsCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, topic, 1);
restartBrokers(true);
assertSubscriptionsCount(broker1, topic, 1);
- assertNCSubscriptionsCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, topic, 1);
removeSubscription(broker1, topic, subName);
assertSubscriptionsCount(broker1, topic, 0);
- assertNCSubscriptionsCount(broker2, topic, 0);
+ assertNCDurableSubsCount(broker2, topic, 0);
}
@@ -135,7 +131,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
sub1.close();
assertSubscriptionsCount(broker1, topic, 1);
- assertNCSubscriptionsCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, topic, 1);
doTearDown();
restartBroker(broker1, false);
@@ -146,9 +142,9 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
//Test that on successful reconnection of the bridge that
//the NC sub will be removed
restartBroker(broker2, true);
- assertNCSubscriptionsCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, topic, 1);
restartBroker(broker1, true);
- assertNCSubscriptionsCount(broker2, topic, 0);
+ assertNCDurableSubsCount(broker2, topic, 0);
}
@@ -160,7 +156,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
sub1.close();
assertSubscriptionsCount(broker1, topic, 1);
- assertNCSubscriptionsCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, topic, 1);
doTearDown();
restartBroker(broker1, false);
@@ -176,13 +172,13 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
//before sync, the old NC should exist
restartBroker(broker2, true);
- assertNCSubscriptionsCount(broker2, topic, 1);
- assertNCSubscriptionsCount(broker2, topic2, 0);
+ assertNCDurableSubsCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, topic2, 0);
//After sync, remove old NC and create one for topic 2
restartBroker(broker1, true);
- assertNCSubscriptionsCount(broker2, topic, 0);
- assertNCSubscriptionsCount(broker2, topic2, 1);
+ assertNCDurableSubsCount(broker2, topic, 0);
+ assertNCDurableSubsCount(broker2, topic2, 1);
}
@@ -193,7 +189,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName);
assertSubscriptionsCount(broker1, topic, 0);
- assertNCSubscriptionsCount(broker2, topic, 0);
+ assertNCDurableSubsCount(broker2, topic, 0);
doTearDown();
restartBroker(broker1, false);
@@ -207,9 +203,9 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
assertSubscriptionsCount(broker1, topic2, 1);
restartBrokers(true);
- assertNCSubscriptionsCount(broker2, topic, 1);
- assertNCSubscriptionsCount(broker2, topic2, 1);
- assertNCSubscriptionsCount(broker2, excludeTopic, 0);
+ assertNCDurableSubsCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, topic2, 1);
+ assertNCDurableSubsCount(broker2, excludeTopic, 0);
}
@@ -223,7 +219,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
assertSubscriptionsCount(broker1, topic, 0);
- assertNCSubscriptionsCount(broker2, topic, 0);
+ assertNCDurableSubsCount(broker2, topic, 0);
doTearDown();
restartBroker(broker1, false);
@@ -235,7 +231,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
//Since we are using an old version of openwire, the NC should
//not be added
restartBrokers(true);
- assertNCSubscriptionsCount(broker2, topic, 0);
+ assertNCDurableSubsCount(broker2, topic, 0);
}
@@ -246,7 +242,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
assertSubscriptionsCount(broker1, topic, 0);
- assertNCSubscriptionsCount(broker2, topic, 0);
+ assertNCDurableSubsCount(broker2, topic, 0);
doTearDown();
restartBroker(broker1, false);
@@ -256,7 +252,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
assertSubscriptionsCount(broker1, topic, 1);
restartBrokers(true);
- assertNCSubscriptionsCount(broker2, topic, 0);
+ assertNCDurableSubsCount(broker2, topic, 0);
}
@Test
@@ -266,7 +262,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
assertSubscriptionsCount(broker1, topic, 0);
- assertNCSubscriptionsCount(broker2, topic, 0);
+ assertNCDurableSubsCount(broker2, topic, 0);
doTearDown();
restartBroker(broker1, false);
@@ -276,10 +272,10 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
assertSubscriptionsCount(broker1, topic, 1);
restartBrokers(true);
- assertNCSubscriptionsCount(broker2, topic, 0);
+ assertNCDurableSubsCount(broker2, topic, 0);
//bring online again
session1.createDurableSubscriber(topic, subName);
- assertNCSubscriptionsCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, topic, 1);
}
@@ -290,7 +286,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
session1.createDurableSubscriber(topic, subName).close();
assertSubscriptionsCount(broker1, topic, 1);
- assertNCSubscriptionsCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, topic, 1);
doTearDown();
restartBroker(broker1, false);
@@ -301,8 +297,8 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
assertSubscriptionsCount(broker1, topic, 1);
restartBrokers(true);
- assertNCSubscriptionsCount(broker2, topic, 1);
- assertNCSubscriptionsCount(broker2, excludeTopic, 0);
+ assertNCDurableSubsCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, excludeTopic, 0);
}
@@ -314,7 +310,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName);
assertSubscriptionsCount(broker1, topic, 0);
- assertNCSubscriptionsCount(broker2, topic, 0);
+ assertNCDurableSubsCount(broker2, topic, 0);
doTearDown();
restartBrokers(false);
@@ -342,32 +338,9 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
session1.createDurableSubscriber(excludeTopic, "sub-exclude");
Thread.sleep(1000);
- assertNCSubscriptionsCount(broker2, topic, 1);
- assertNCSubscriptionsCount(broker2, excludeTopic, 0);
-
- }
-
- protected void removeSubscription(final BrokerService brokerService, final ActiveMQTopic topic,
- final String subName) throws Exception {
- final RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
- info.setClientId(clientId);
- info.setSubscriptionName(subName);
+ assertNCDurableSubsCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, excludeTopic, 0);
- final ConnectionContext context = new ConnectionContext();
- context.setBroker(brokerService.getBroker());
- context.setClientId(clientId);
-
- brokerService.getBroker().removeSubscription(context, info);
- }
-
- protected void assertSubscriptionsCount(final BrokerService brokerService,
- final ActiveMQTopic dest, final int count) throws Exception {
- assertTrue(Wait.waitFor(new Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return count == getSubscriptions(brokerService, dest).size();
- }
- }, 10000, 500));
}
protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception {
@@ -387,7 +360,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, File localDataDir,
File remoteDataDir) throws Exception {
included = new ActiveMQTopic(testTopicName);
- doSetUpRemoteBroker(deleteAllMessages, remoteDataDir);
+ doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0);
doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, localDataDir);
//Give time for advisories to propagate
Thread.sleep(1000);
@@ -399,8 +372,13 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
}
protected void restartRemoteBroker() throws Exception {
+ int port = 0;
+ if (remoteBroker != null) {
+ List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
+ port = transportConnectors.get(0).getConnectUri().getPort();
+ }
stopRemoteBroker();
- doSetUpRemoteBroker(false, remoteBroker.getDataDirectoryFile());
+ doSetUpRemoteBroker(false, remoteBroker.getDataDirectoryFile(), port);
}
protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector,
@@ -438,8 +416,8 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
}
}
- protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir) throws Exception {
- remoteBroker = createRemoteBroker(dataDir);
+ protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir, int port) throws Exception {
+ remoteBroker = createRemoteBroker(dataDir, port);
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
remoteBroker.start();
remoteBroker.waitUntilStarted();
@@ -494,7 +472,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
protected AdvisoryBroker remoteAdvisoryBroker;
- protected BrokerService createRemoteBroker(File dataDir) throws Exception {
+ protected BrokerService createRemoteBroker(File dataDir, int port) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setBrokerName("remoteBroker");
brokerService.setUseJmx(false);
@@ -502,7 +480,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
remoteAdvisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
- brokerService.addConnector("tcp://localhost:0?wireFormat.version=" + remoteBrokerWireFormatVersion);
+ brokerService.addConnector("tcp://localhost:" + port + "?wireFormat.version=" + remoteBrokerWireFormatVersion);
return brokerService;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
index 29b726a..0b388cc 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
@@ -30,8 +30,11 @@ import javax.jms.Session;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.RemoveSubscriptionInfo;
@@ -96,24 +99,24 @@ public abstract class DynamicNetworkTestSupport {
}
protected void waitForConsumerCount(final DestinationStatistics destinationStatistics, final int count) throws Exception {
- Wait.waitFor(new Wait.Condition() {
+ assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
//should only be 1 for the composite destination creation
return count == destinationStatistics.getConsumers().getCount();
}
- });
+ }));
}
protected void waitForDispatchFromLocalBroker(final DestinationStatistics destinationStatistics, final int count) throws Exception {
- Wait.waitFor(new Wait.Condition() {
+ assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return count == destinationStatistics.getDequeues().getCount() &&
count == destinationStatistics.getDispatched().getCount() &&
count == destinationStatistics.getForwards().getCount();
}
- });
+ }));
}
protected void assertLocalBrokerStatistics(final DestinationStatistics localStatistics, final int count) {
@@ -126,16 +129,32 @@ public abstract class DynamicNetworkTestSupport {
MessageConsumer createConsumer() throws JMSException;
}
- protected void assertNCSubscriptionsCount(final BrokerService brokerService,
+ protected void assertNCDurableSubsCount(final BrokerService brokerService,
final ActiveMQTopic dest, final int count) throws Exception {
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
- return count == getNCSubscriptions(brokerService, dest).size();
+ return count == getNCDurableSubs(brokerService, dest).size();
}
}, 10000, 500));
}
+ protected void assertConsumersCount(final BrokerService brokerService,
+ final ActiveMQTopic dest, final int count) throws Exception {
+ assertTrue(Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return count == getConsumers(brokerService, dest).size();
+ }
+ }, 10000, 500));
+ }
+
+ protected List<Subscription> getConsumers(final BrokerService brokerService,
+ final ActiveMQTopic dest) throws Exception {
+ Topic destination = (Topic) brokerService.getDestination(dest);
+ return destination.getConsumers();
+ }
+
protected List<DurableTopicSubscription> getSubscriptions(final BrokerService brokerService,
final ActiveMQTopic dest) throws Exception {
List<DurableTopicSubscription> subs = new ArrayList<>();
@@ -151,10 +170,17 @@ public abstract class DynamicNetworkTestSupport {
return subs;
}
- protected List<DurableTopicSubscription> getNCSubscriptions(final BrokerService brokerService,
+ protected List<DurableTopicSubscription> getNCDurableSubs(final BrokerService brokerService,
final ActiveMQTopic dest) throws Exception {
List<DurableTopicSubscription> subs = new ArrayList<>();
- Topic destination = (Topic) brokerService.getDestination(dest);
+ Destination d = brokerService.getDestination(dest);
+ Topic destination = null;
+ if (d instanceof DestinationFilter){
+ destination = ((DestinationFilter) d).getAdaptor(Topic.class);
+ } else {
+ destination = (Topic) d;
+ }
+
for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) {
DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
@@ -166,4 +192,28 @@ public abstract class DynamicNetworkTestSupport {
return subs;
}
+ protected void removeSubscription(final BrokerService brokerService, final ActiveMQTopic topic,
+ final String subName) throws Exception {
+ final RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
+ info.setClientId(clientId);
+ info.setSubscriptionName(subName);
+
+ final ConnectionContext context = new ConnectionContext();
+ context.setBroker(brokerService.getBroker());
+ context.setClientId(clientId);
+
+ brokerService.getBroker().removeSubscription(context, info);
+ }
+
+ protected void assertSubscriptionsCount(final BrokerService brokerService,
+ final ActiveMQTopic dest, final int count) throws Exception {
+ assertTrue(Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return count == getSubscriptions(brokerService, dest).size();
+ }
+ }, 10000, 500));
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java
new file mode 100644
index 0000000..770ba6e
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+@RunWith(Parameterized.class)
+public class ForceDurableNetworkBridgeTest extends DynamicNetworkTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(ForceDurableNetworkBridgeTest.class);
+
+ protected String testTopicName2 = "include.nonforced.bar";
+ protected String staticTopic = "include.static.bar";
+ protected String staticTopic2 = "include.static.nonforced.bar";
+ public static enum FLOW {FORWARD, REVERSE};
+ private BrokerService broker1;
+ private BrokerService broker2;
+ private Session session1;
+ private final FLOW flow;
+
+ @Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {FLOW.FORWARD},
+ {FLOW.REVERSE}
+ });
+ }
+
+ public ForceDurableNetworkBridgeTest(final FLOW flow) {
+ this.flow = flow;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ doSetUp(true, tempFolder.newFolder(), tempFolder.newFolder());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ doTearDown();
+ }
+
+ @Test
+ public void testForceDurableSubscriptionStatic() throws Exception {
+ final ActiveMQTopic topic = new ActiveMQTopic(staticTopic);
+
+ assertNCDurableSubsCount(broker2, topic, 1);
+ assertConsumersCount(broker2, topic, 1);
+
+ //Static so consumers stick around
+ assertNCDurableSubsCount(broker2, topic, 1);
+ assertConsumersCount(broker2, topic, 1);
+ }
+
+ @Test
+ public void testConsumerNotForceDurableSubscriptionStatic() throws Exception {
+ final ActiveMQTopic topic = new ActiveMQTopic(staticTopic2);
+
+ assertConsumersCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, topic, 0);
+ }
+
+ @Test
+ public void testConsumerNotForceDurableSubscription() throws Exception {
+ final ActiveMQTopic topic = new ActiveMQTopic(testTopicName2);
+ MessageConsumer sub1 = session1.createConsumer(topic);
+
+ assertConsumersCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, topic, 0);
+ sub1.close();
+
+ assertNCDurableSubsCount(broker2, topic, 0);
+ assertConsumersCount(broker2, topic, 0);
+ }
+
+ @Test
+ public void testConsumerNotForceDurableWithAnotherDurable() throws Exception {
+ final ActiveMQTopic topic = new ActiveMQTopic(testTopicName2);
+ TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName);
+ session1.createConsumer(topic);
+
+ //1 consumer because of conduit
+ //1 durable sub
+ assertConsumersCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, topic, 1);
+
+ //Remove the sub
+ durSub.close();
+ Thread.sleep(1000);
+ removeSubscription(broker1, topic, subName);
+
+ //The durable should be gone even though there is a consumer left
+ //since we are not forcing durable subs
+ assertNCDurableSubsCount(broker2, topic, 0);
+ //consumers count ends up being 0 here, even though there is a non-durable consumer left,
+ //because the durable sub is destroyed and it is a conduit subscription
+ //this is another good reason to want to enable forcing of durables
+ assertConsumersCount(broker2, topic, 0);
+ }
+
+ @Test
+ public void testForceDurableSubscription() throws Exception {
+ final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
+ MessageConsumer sub1 = session1.createConsumer(topic);
+
+ assertNCDurableSubsCount(broker2, topic, 1);
+ assertConsumersCount(broker2, topic, 1);
+ sub1.close();
+
+ assertNCDurableSubsCount(broker2, topic, 0);
+ assertConsumersCount(broker2, topic, 0);
+ }
+
+ @Test
+ public void testForceDurableMultiSubscriptions() throws Exception {
+ final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
+ MessageConsumer sub1 = session1.createConsumer(topic);
+ MessageConsumer sub2 = session1.createConsumer(topic);
+ MessageConsumer sub3 = session1.createConsumer(topic);
+
+ assertNCDurableSubsCount(broker2, topic, 1);
+ assertConsumersCount(broker2, topic, 1);
+ sub1.close();
+ sub2.close();
+
+ assertNCDurableSubsCount(broker2, topic, 1);
+ assertConsumersCount(broker2, topic, 1);
+
+ sub3.close();
+
+ assertNCDurableSubsCount(broker2, topic, 0);
+ assertConsumersCount(broker2, topic, 0);
+ }
+
+ @Test
+ public void testForceDurableSubWithDurableCreatedFirst() throws Exception {
+ final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
+ TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName);
+ durSub.close();
+ assertNCDurableSubsCount(broker2, topic, 1);
+
+ MessageConsumer sub1 = session1.createConsumer(topic);
+ Thread.sleep(1000);
+ assertNCDurableSubsCount(broker2, topic, 1);
+ sub1.close();
+
+ Thread.sleep(1000);
+ assertNCDurableSubsCount(broker2, topic, 1);
+
+ removeSubscription(broker1, topic, subName);
+ assertNCDurableSubsCount(broker2, topic, 0);
+ }
+
+ @Test
+ public void testForceDurableSubWithNonDurableCreatedFirst() throws Exception {
+ final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
+ MessageConsumer sub1 = session1.createConsumer(topic);
+ assertNCDurableSubsCount(broker2, topic, 1);
+
+ TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName);
+ durSub.close();
+ Thread.sleep(1000);
+ assertNCDurableSubsCount(broker2, topic, 1);
+
+ removeSubscription(broker1, topic, subName);
+ Thread.sleep(1000);
+ assertConsumersCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, topic, 1);
+
+ sub1.close();
+ assertNCDurableSubsCount(broker2, topic, 0);
+ }
+
+ @Test
+ public void testDurableSticksAroundOnConsumerClose() throws Exception {
+ final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
+ //Create the normal consumer first
+ MessageConsumer sub1 = session1.createConsumer(topic);
+ assertNCDurableSubsCount(broker2, topic, 1);
+
+ TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName);
+ durSub.close();
+ sub1.close();
+ Thread.sleep(1000);
+ //Both consumer and durable are closed but the durable should stick around
+ assertConsumersCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, topic, 1);
+
+ removeSubscription(broker1, topic, subName);
+ assertConsumersCount(broker2, topic, 0);
+ assertNCDurableSubsCount(broker2, topic, 0);
+ }
+
+ protected void restartBrokers() throws Exception {
+ doTearDown();
+ doSetUp(false, localBroker.getDataDirectoryFile(), remoteBroker.getDataDirectoryFile());
+ }
+
+ protected void doSetUp(boolean deleteAllMessages, File localDataDir,
+ File remoteDataDir) throws Exception {
+ included = new ActiveMQTopic(testTopicName);
+ doSetUpRemoteBroker(deleteAllMessages, remoteDataDir);
+ doSetUpLocalBroker(deleteAllMessages, localDataDir);
+ //Give time for advisories to propagate
+ Thread.sleep(1000);
+ }
+
+ protected void doSetUpLocalBroker(boolean deleteAllMessages, File dataDir) throws Exception {
+ localBroker = createLocalBroker(dataDir);
+ localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ localBroker.start();
+ localBroker.waitUntilStarted();
+ URI localURI = localBroker.getVmConnectorURI();
+ ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
+ fac.setAlwaysSyncSend(true);
+ fac.setDispatchAsync(false);
+ localConnection = fac.createConnection();
+ localConnection.setClientID("clientId");
+ localConnection.start();
+
+ Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1;
+ }
+ }, 10000, 500);
+
+ localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ if (flow.equals(FLOW.FORWARD)) {
+ broker1 = localBroker;
+ session1 = localSession;
+ } else {
+ broker2 = localBroker;
+ }
+ }
+
+ protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir) throws Exception {
+ remoteBroker = createRemoteBroker(dataDir);
+ remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ remoteBroker.start();
+ remoteBroker.waitUntilStarted();
+ URI remoteURI = remoteBroker.getVmConnectorURI();
+ ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(remoteURI);
+ remoteConnection = fac.createConnection();
+ remoteConnection.setClientID("clientId");
+ remoteConnection.start();
+ remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ if (flow.equals(FLOW.FORWARD)) {
+ broker2 = remoteBroker;
+ } else {
+ broker1 = remoteBroker;
+ session1 = remoteSession;
+ }
+ }
+
+ protected BrokerService createLocalBroker(File dataDir) throws Exception {
+ BrokerService brokerService = new BrokerService();
+ brokerService.setMonitorConnectionSplits(true);
+ brokerService.setDataDirectoryFile(dataDir);
+ brokerService.setBrokerName("localBroker");
+ brokerService.addNetworkConnector(configureLocalNetworkConnector());
+ brokerService.addConnector("tcp://localhost:0");
+ brokerService.setDestinations(new ActiveMQDestination[] {
+ new ActiveMQTopic(testTopicName),
+ new ActiveMQTopic(testTopicName2),
+ new ActiveMQTopic(excludeTopicName)});
+
+ return brokerService;
+ }
+
+ protected NetworkConnector configureLocalNetworkConnector() throws Exception {
+ List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
+ URI remoteURI = transportConnectors.get(0).getConnectUri();
+ String uri = "static:(" + remoteURI + ")";
+ NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
+ connector.setName("networkConnector");
+ connector.setDynamicOnly(false);
+ connector.setDecreaseNetworkConsumerPriority(false);
+ connector.setConduitSubscriptions(true);
+ connector.setDuplex(true);
+ connector.setStaticBridge(false);
+ connector.setStaticallyIncludedDestinations(Lists.<ActiveMQDestination>newArrayList(
+ new ActiveMQTopic(staticTopic + "?forceDurable=true"),
+ new ActiveMQTopic(staticTopic2)));
+ connector.setDynamicallyIncludedDestinations(
+ Lists.<ActiveMQDestination>newArrayList(
+ new ActiveMQTopic("include.test.>?forceDurable=true"),
+ new ActiveMQTopic(testTopicName2)));
+ connector.setExcludedDestinations(
+ Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(excludeTopicName)));
+ return connector;
+ }
+
+
+ protected BrokerService createRemoteBroker(File dataDir) throws Exception {
+ BrokerService brokerService = new BrokerService();
+ brokerService.setBrokerName("remoteBroker");
+ brokerService.setUseJmx(false);
+ brokerService.setDataDirectoryFile(dataDir);
+ brokerService.addConnector("tcp://localhost:0");
+ brokerService.setDestinations(new ActiveMQDestination[] {
+ new ActiveMQTopic(testTopicName),
+ new ActiveMQTopic(testTopicName2),
+ new ActiveMQTopic(excludeTopicName)});
+
+ return brokerService;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e73ab348/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
index f474929..782f53f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
@@ -25,6 +25,7 @@ import java.lang.reflect.Field;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
@@ -39,6 +40,7 @@ import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.virtual.CompositeQueue;
@@ -240,9 +242,18 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
*/
@Test(timeout = 60 * 1000)
public void testDynamicFlow() throws Exception {
+ testDynamicFlow(false);
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testDynamicFlowForceDurable() throws Exception {
+ testDynamicFlow(true);
+ }
+
+ protected void testDynamicFlow(boolean forceDurable) throws Exception {
Assume.assumeTrue(isUseVirtualDestSubsOnCreation);
- doSetUp(true, null);
+ doSetUp(true, null, true, forceDurable);
MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName);
@@ -262,6 +273,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
waitForConsumerCount(destinationStatistics, 1);
+ assertNCDurableSubsCount(localBroker, included, forceDurable ? 1 : 0);
includedProducer.send(test);
waitForDispatchFromLocalBroker(destinationStatistics, 1);
@@ -272,7 +284,6 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
assertAdvisoryBrokerCounts(1,1,1);
}
-
/**
* Test that dynamic flow works for virtual destinations when a second composite
* topic is included that forwards to the same queue, but is excluded from
@@ -1006,7 +1017,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
CompositeTopic compositeTopic = createCompositeTopic(testTopicName,
new ActiveMQQueue("include.test.bar.bridge"));
- doSetUp(true, new VirtualDestination[] {compositeTopic}, false);
+ doSetUp(true, new VirtualDestination[] {compositeTopic}, false, false);
MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName);
@@ -1034,7 +1045,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
CompositeTopic compositeTopic = createCompositeTopic(testTopicName,
new ActiveMQQueue("include.test.bar.bridge"));
- doSetUp(true, new VirtualDestination[] {compositeTopic}, false);
+ doSetUp(true, new VirtualDestination[] {compositeTopic}, false, false);
MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName);
@@ -1291,16 +1302,16 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
protected void doSetUp(boolean deleteAllMessages,
VirtualDestination[] remoteVirtualDests) throws Exception {
- doSetUp(deleteAllMessages, remoteVirtualDests, true);
+ doSetUp(deleteAllMessages, remoteVirtualDests, true, false);
}
protected void doSetUp(boolean deleteAllMessages,
- VirtualDestination[] remoteVirtualDests, boolean startNetworkConnector) throws Exception {
+ VirtualDestination[] remoteVirtualDests, boolean startNetworkConnector, boolean forceDurable) throws Exception {
remoteBroker = createRemoteBroker(isUseVirtualDestSubsOnCreation, remoteVirtualDests);
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
remoteBroker.start();
remoteBroker.waitUntilStarted();
- localBroker = createLocalBroker(startNetworkConnector);
+ localBroker = createLocalBroker(startNetworkConnector, forceDurable);
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
localBroker.start();
localBroker.waitUntilStarted();
@@ -1324,13 +1335,16 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
protected NetworkConnector connector;
- protected BrokerService createLocalBroker(boolean startNetworkConnector) throws Exception {
+ protected BrokerService createLocalBroker(boolean startNetworkConnector, boolean forceDurable) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setMonitorConnectionSplits(true);
brokerService.setDataDirectoryFile(tempFolder.newFolder());
brokerService.setBrokerName("localBroker");
- connector = new DiscoveryNetworkConnector(new URI("static:(tcp://localhost:61617)"));
+ List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
+ URI remoteURI = transportConnectors.get(0).getConnectUri();
+ String uri = "static:(" + remoteURI + ")";
+ connector = new DiscoveryNetworkConnector(new URI(uri));
connector.setName("networkConnector");
connector.setDynamicOnly(false);
connector.setDecreaseNetworkConsumerPriority(false);
@@ -1338,7 +1352,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
connector.setDuplex(isDuplex);
connector.setUseVirtualDestSubs(true);
connector.setDynamicallyIncludedDestinations(Lists.newArrayList(new ActiveMQQueue(testQueueName),
- new ActiveMQTopic(testTopicName), new ActiveMQTopic("VirtualTopic.>")));
+ new ActiveMQTopic(testTopicName + (forceDurable ? "?forceDurable=true" : "")), new ActiveMQTopic("VirtualTopic.>")));
connector.setExcludedDestinations(Lists.newArrayList(new ActiveMQQueue("exclude.test.foo"),
new ActiveMQTopic("exclude.test.bar")));
@@ -1346,7 +1360,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
brokerService.addNetworkConnector(connector);
}
- brokerService.addConnector("tcp://localhost:61616");
+ brokerService.addConnector("tcp://localhost:0");
return brokerService;
}
@@ -1374,7 +1388,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
remoteAdvisoryBroker = (AdvisoryBroker)
brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
- brokerService.addConnector("tcp://localhost:61617");
+ brokerService.addConnector("tcp://localhost:0");
return brokerService;
}