You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/10/20 17:47:49 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6472
Repository: activemq
Updated Branches:
refs/heads/master 52ab6ba09 -> d206621a7
https://issues.apache.org/jira/browse/AMQ-6472
Durable sync over a network bridge will now also sync non-durable
subscriptions proplrly if the consumer belongs to a destination that is
configured to force network durable subscriptions.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d206621a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d206621a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d206621a
Branch: refs/heads/master
Commit: d206621a73e4731b00eff49518ea93ba9a4ee3c0
Parents: 52ab6ba
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Thu Oct 20 13:46:27 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Oct 20 13:47:38 2016 -0400
----------------------------------------------------------------------
.../activemq/advisory/AdvisoryBroker.java | 4 +
.../activemq/broker/TransportConnection.java | 30 +--
.../network/DemandForwardingBridgeSupport.java | 40 +---
.../activemq/network/DurableConduitBridge.java | 41 +----
.../activemq/util/NetworkBridgeUtils.java | 184 +++++++++++++++++++
.../network/DurableSyncNetworkBridgeTest.java | 138 +++++++++++++-
6 files changed, 344 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/d206621a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 5ac201e..556c149 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -893,6 +893,10 @@ public class AdvisoryBroker extends BrokerFilter {
return destinations;
}
+ public ConcurrentMap<ConsumerInfo, VirtualDestination> getVirtualDestinationConsumers() {
+ return virtualDestinationConsumers;
+ }
+
private class VirtualConsumerPair {
private final VirtualDestination virtualDestination;
http://git-wip-us.apache.org/repos/asf/activemq/blob/d206621a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index a05ba8c..a32d4f6 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -20,14 +20,15 @@ import java.io.EOFException;
import java.io.IOException;
import java.net.SocketException;
import java.net.URI;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@@ -39,10 +40,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.transaction.xa.XAResource;
+import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerInfo;
@@ -103,7 +106,7 @@ import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
-import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
+import org.apache.activemq.util.NetworkBridgeUtils;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1378,23 +1381,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
this.pendingStop = pendingStop;
}
- public static BrokerSubscriptionInfo getBrokerSubscriptionInfo(final BrokerService brokerService) {
- RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
- TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
- List<ConsumerInfo> subscriptionInfos = new ArrayList<>();
- for (SubscriptionKey key : topicRegion.getDurableSubscriptions().keySet()) {
- DurableTopicSubscription sub = topicRegion.getDurableSubscriptions().get(key);
- if (sub != null) {
- ConsumerInfo ci = sub.getConsumerInfo().copy();
- ci.setClientId(key.getClientId());
- subscriptionInfos.add(ci);
- }
- }
- BrokerSubscriptionInfo bsi = new BrokerSubscriptionInfo(brokerService.getBrokerName());
- bsi.setSubscriptionInfos(subscriptionInfos.toArray(new ConsumerInfo[0]));
- return bsi;
- }
-
private NetworkBridgeConfiguration getNetworkConfiguration(final BrokerInfo info) throws IOException {
Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
Map<String, String> props = createMap(properties);
@@ -1412,7 +1398,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
- dispatchSync(getBrokerSubscriptionInfo(this.broker.getBrokerService()));
+ dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
}
} catch (Exception e) {
LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e);
@@ -1425,9 +1411,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
config.setBrokerName(broker.getBrokerName());
- if (config.isSyncDurableSubs() && protocolVersion.get() >= 12) {
+ if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
- dispatchSync(getBrokerSubscriptionInfo(this.broker.getBrokerService()));
+ dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
}
// check for existing duplex connection hanging about
http://git-wip-us.apache.org/repos/asf/activemq/blob/d206621a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index b58259d..a8c45b0 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -99,6 +99,7 @@ import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.util.NetworkBridgeUtils;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
@@ -575,7 +576,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
remoteBroker.oneway(brokerInfo);
if (configuration.isSyncDurableSubs() &&
remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
- remoteBroker.oneway(TransportConnection.getBrokerSubscriptionInfo(brokerService));
+ remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService,
+ configuration));
}
}
if (remoteConnectionInfo != null) {
@@ -656,8 +658,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (started.get()) {
if (subInfo.getSubscriptionInfos() != null) {
for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
- if(!info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) &&
- matchesDynamicallyIncludedDestinations(info.getDestination())) {
+ //re-add any process any non-NC consumers that match the
+ //dynamicallyIncludedDestinations list
+ if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
+ NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
serviceRemoteConsumerAdvisory(info);
}
}
@@ -666,7 +670,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
//After re-added, clean up any empty durables
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next();
- if (matchesDynamicallyIncludedDestinations(ds.getLocalInfo().getDestination())) {
+ if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) {
cleanupDurableSub(ds, i);
}
}
@@ -907,7 +911,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
Iterator<DemandSubscription> i) throws IOException {
if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()
&& ds.getForcedDurableConsumersSize() == 0) {
-
// deactivate subscriber
RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
localBroker.oneway(removeInfo);
@@ -1245,33 +1248,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return true;
}
- private boolean matchesDynamicallyIncludedDestinations(ActiveMQDestination destination) {
- ActiveMQDestination[] dests = dynamicallyIncludedDestinations;
- if (dests != null && dests.length > 0) {
- for (ActiveMQDestination dest : dests) {
- DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
- if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
- return true;
- }
- }
- }
-
- return false;
- }
-
- protected ActiveMQDestination findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination destination) {
- if (dests != null && dests.length > 0) {
- for (ActiveMQDestination dest : dests) {
- DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
- if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
- return dest;
- }
- }
- }
-
- return null;
- }
-
/**
* Subscriptions for these destinations are always created
*/
http://git-wip-us.apache.org/repos/asf/activemq/blob/d206621a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index 969c386..50c9855 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -17,9 +17,7 @@
package org.apache.activemq.network;
import java.io.IOException;
-import java.util.Map;
-import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
@@ -29,7 +27,7 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.transport.Transport;
-import org.apache.activemq.util.TypeConversionSupport;
+import org.apache.activemq.util.NetworkBridgeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -120,7 +118,8 @@ public class DurableConduitBridge extends ConduitBridge {
@Override
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
- boolean isForcedDurable = isForcedDurable(info);
+ boolean isForcedDurable = NetworkBridgeUtils.isForcedDurable(info,
+ dynamicallyIncludedDestinations, staticallyIncludedDestinations);
if (addToAlreadyInterestedConsumers(info, isForcedDurable)) {
return null; // don't want this subscription added
@@ -146,40 +145,6 @@ public class DurableConduitBridge extends ConduitBridge {
return demandSubscription;
}
-
- private boolean isForcedDurable(ConsumerInfo info) {
- if (info.isDurable()) {
- return false;
- }
-
- ActiveMQDestination destination = info.getDestination();
- if (AdvisorySupport.isAdvisoryTopic(destination) || destination.isTemporary() ||
- destination.isQueue()) {
- return false;
- }
-
- ActiveMQDestination matching = findMatchingDestination(dynamicallyIncludedDestinations, destination);
- if (matching != null) {
- return isDestForcedDurable(matching);
- }
- matching = findMatchingDestination(staticallyIncludedDestinations, destination);
- if (matching != null) {
- return isDestForcedDurable(matching);
- }
- return false;
- }
-
- private boolean isDestForcedDurable(ActiveMQDestination destination) {
- final Map<String, String> options = destination.getOptions();
-
- boolean isForceDurable = false;
- if (options != null) {
- isForceDurable = (boolean) TypeConversionSupport.convert(options.get("forceDurable"), boolean.class);
- }
-
- return isForceDurable;
- }
-
protected String getSubscriberName(ActiveMQDestination dest) {
String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName();
return subscriberName;
http://git-wip-us.apache.org/repos/asf/activemq/blob/d206621a/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java b/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java
new file mode 100644
index 0000000..700baf6
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq.advisory.AdvisoryBroker;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.TopicRegion;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.BrokerSubscriptionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.filter.DestinationFilter;
+import org.apache.activemq.network.NetworkBridgeConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetworkBridgeUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeUtils.class);
+
+ /**
+ * Generate the BrokerSubscriptionInfo which is used to tell the broker on the other
+ * side of the network bridge which NC durable subscriptions are still needed for demand.
+ * @param brokerService
+ * @param config
+ * @return
+ */
+ public static BrokerSubscriptionInfo getBrokerSubscriptionInfo(final BrokerService brokerService,
+ final NetworkBridgeConfiguration config) {
+
+ RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
+ TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
+ Set<ConsumerInfo> subscriptionInfos = new HashSet<>();
+
+ //Add all durable subscriptions to the set that match the network config
+ //which currently is just the dynamicallyIncludedDestinations list
+ for (SubscriptionKey key : topicRegion.getDurableSubscriptions().keySet()) {
+ DurableTopicSubscription sub = topicRegion.getDurableSubscriptions().get(key);
+ if (sub != null && NetworkBridgeUtils.matchesNetworkConfig(config, sub.getConsumerInfo().getDestination())) {
+ ConsumerInfo ci = sub.getConsumerInfo().copy();
+ ci.setClientId(key.getClientId());
+ subscriptionInfos.add(ci);
+ }
+ }
+
+ //We also need to iterate over all normal subscriptions and check if they are part of
+ //any dynamicallyIncludedDestination that is configured with forceDurable to be true
+ //over the network bridge. If forceDurable is true then we want to add the consumer to the set
+ for (Subscription sub : topicRegion.getSubscriptions().values()) {
+ if (sub != null && NetworkBridgeUtils.isForcedDurable(sub.getConsumerInfo(),
+ config.getDynamicallyIncludedDestinations())) {
+ subscriptionInfos.add(sub.getConsumerInfo().copy());
+ }
+ }
+
+ try {
+ //Lastly, if isUseVirtualDestSubs is configured on this broker (to fire advisories) and
+ //configured on the network connector (to listen to advisories) then also add any virtual
+ //dest subscription to the set if forceDurable is true for its destination
+ AdvisoryBroker ab = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
+ if (ab != null && brokerService.isUseVirtualDestSubs() && config.isUseVirtualDestSubs()) {
+ for (ConsumerInfo info : ab.getVirtualDestinationConsumers().keySet()) {
+ if (NetworkBridgeUtils.isForcedDurable(info, config.getDynamicallyIncludedDestinations())) {
+ subscriptionInfos.add(info.copy());
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Error processing virtualDestinationSubs for BrokerSubscriptionInfo");
+ LOG.debug("Error processing virtualDestinationSubs for BrokerSubscriptionInfo", e);
+ }
+ BrokerSubscriptionInfo bsi = new BrokerSubscriptionInfo(brokerService.getBrokerName());
+ bsi.setSubscriptionInfos(subscriptionInfos.toArray(new ConsumerInfo[0]));
+ return bsi;
+ }
+
+ public static boolean isForcedDurable(final ConsumerInfo info,
+ final List<ActiveMQDestination> dynamicallyIncludedDestinations) {
+ return dynamicallyIncludedDestinations != null
+ ? isForcedDurable(info,
+ dynamicallyIncludedDestinations.toArray(new ActiveMQDestination[0]), null) : false;
+ }
+
+ public static boolean isForcedDurable(final ConsumerInfo info,
+ final ActiveMQDestination[] dynamicallyIncludedDestinations,
+ final ActiveMQDestination[] staticallyIncludedDestinations) {
+
+ if (info.isDurable() || info.getDestination().isQueue()) {
+ return false;
+ }
+
+ ActiveMQDestination destination = info.getDestination();
+ if (AdvisorySupport.isAdvisoryTopic(destination) || destination.isTemporary() ||
+ destination.isQueue()) {
+ return false;
+ }
+
+ ActiveMQDestination matching = findMatchingDestination(dynamicallyIncludedDestinations, destination);
+ if (matching != null) {
+ return isDestForcedDurable(matching);
+ }
+ matching = findMatchingDestination(staticallyIncludedDestinations, destination);
+ if (matching != null) {
+ return isDestForcedDurable(matching);
+ }
+ return false;
+ }
+
+ public static boolean matchesNetworkConfig(final NetworkBridgeConfiguration config,
+ ActiveMQDestination destination) {
+ List<ActiveMQDestination> includedDests = config.getDynamicallyIncludedDestinations();
+ if (includedDests != null && includedDests.size() > 0) {
+ for (ActiveMQDestination dest : includedDests) {
+ DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
+ if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ public static boolean matchesDestinations(ActiveMQDestination[] dests, final ActiveMQDestination destination) {
+ if (dests != null && dests.length > 0) {
+ for (ActiveMQDestination dest : dests) {
+ DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
+ if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ public static ActiveMQDestination findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination destination) {
+ if (dests != null && dests.length > 0) {
+ for (ActiveMQDestination dest : dests) {
+ DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
+ if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
+ return dest;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ public static boolean isDestForcedDurable(final ActiveMQDestination destination) {
+ boolean isForceDurable = false;
+ if (destination != null) {
+ final Map<String, String> options = destination.getOptions();
+
+ if (options != null) {
+ isForceDurable = (boolean) TypeConversionSupport.convert(options.get("forceDurable"), boolean.class);
+ }
+ }
+
+ return isForceDurable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d206621a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index 62b3dec..4a705f3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.network;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import java.io.File;
import java.net.URI;
import java.util.Arrays;
@@ -23,17 +26,25 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisoryBroker;
+import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.virtual.CompositeTopic;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker;
+import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
import org.apache.activemq.util.Wait;
@@ -57,11 +68,13 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class);
+ protected JavaRuntimeConfigurationBroker remoteRuntimeBroker;
protected String staticIncludeTopics = "include.static.test";
protected String includedTopics = "include.test.>";
protected String testTopicName2 = "include.test.bar2";
private boolean dynamicOnly = false;
private boolean forceDurable = false;
+ private boolean useVirtualDestSubs = false;
private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
public static enum FLOW {FORWARD, REVERSE};
@@ -107,6 +120,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
staticIncludeTopics = "include.static.test";
dynamicOnly = false;
forceDurable = false;
+ useVirtualDestSubs = false;
remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder());
}
@@ -521,6 +535,116 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
}
+ @Test(timeout = 60 * 1000)
+ public void testVirtualDestSubForceDurableSync() throws Exception {
+ Assume.assumeTrue(flow == FLOW.FORWARD);
+ forceDurable = true;
+ useVirtualDestSubs = true;
+ this.restartBrokers(true);
+
+ //configure a virtual destination that forwards messages from topic testQueueName
+ CompositeTopic compositeTopic = createCompositeTopic(testTopicName,
+ new ActiveMQQueue("include.test.bar.bridge"));
+ remoteRuntimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true);
+
+ MessageProducer includedProducer = localSession.createProducer(included);
+ Message test = localSession.createTextMessage("test");
+
+ final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics();
+ final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination(
+ new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
+
+ //Make sure that the NC durable is created because of the compositeTopic
+ waitForConsumerCount(destinationStatistics, 1);
+ assertNCDurableSubsCount(localBroker, included, 1);
+
+ //Send message and make sure it is dispatched across the bridge
+ includedProducer.send(test);
+ waitForDispatchFromLocalBroker(destinationStatistics, 1);
+ assertLocalBrokerStatistics(destinationStatistics, 1);
+ assertEquals("remote dest messages", 1, remoteDestStatistics.getMessages().getCount());
+
+ //Stop the remote broker so the bridge stops and then send 500 messages so
+ //the messages build up on the NC durable
+ this.stopRemoteBroker();
+ for (int i = 0; i < 500; i++) {
+ includedProducer.send(test);
+ }
+ this.stopLocalBroker();
+
+ //Restart the brokers
+ this.restartRemoteBroker();
+ remoteRuntimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true);
+ this.restartLocalBroker(true);
+
+ //We now need to verify that 501 messages made it to the queue on the remote side
+ //which means that the NC durable was not deleted and recreated during the sync
+ final DestinationStatistics remoteDestStatistics2 = remoteBroker.getDestination(
+ new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
+
+ assertTrue(Wait.waitFor(new Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return remoteDestStatistics2.getMessages().getCount() == 501;
+ }
+ }));
+
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testForceDurableTopicSubSync() throws Exception {
+ Assume.assumeTrue(flow == FLOW.FORWARD);
+ forceDurable = true;
+ this.restartBrokers(true);
+
+ //configure a virtual destination that forwards messages from topic testQueueName
+ remoteSession.createConsumer(included);
+
+ MessageProducer includedProducer = localSession.createProducer(included);
+ Message test = localSession.createTextMessage("test");
+
+ final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics();
+
+ //Make sure that the NC durable is created because of the compositeTopic
+ waitForConsumerCount(destinationStatistics, 1);
+ assertNCDurableSubsCount(localBroker, included, 1);
+
+ //Send message and make sure it is dispatched across the bridge
+ includedProducer.send(test);
+ waitForDispatchFromLocalBroker(destinationStatistics, 1);
+ assertLocalBrokerStatistics(destinationStatistics, 1);
+
+ //Stop the network connector and send messages to the local broker so they build
+ //up on the durable
+ this.localBroker.getNetworkConnectorByName("networkConnector").stop();
+
+ for (int i = 0; i < 500; i++) {
+ includedProducer.send(test);
+ }
+
+ //restart the local broker and bridge
+ this.stopLocalBroker();
+ this.restartLocalBroker(true);
+
+ //We now need to verify that the 500 messages on the NC durable are dispatched
+ //on bridge sync which shows that the durable wasn't destroyed/recreated
+ final DestinationStatistics destinationStatistics2 =
+ localBroker.getDestination(included).getDestinationStatistics();
+ waitForDispatchFromLocalBroker(destinationStatistics2, 500);
+ assertLocalBrokerStatistics(destinationStatistics2, 500);
+
+ }
+
+ protected CompositeTopic createCompositeTopic(String name, ActiveMQDestination...forwardTo) {
+ CompositeTopic compositeTopic = new CompositeTopic();
+ compositeTopic.setName(name);
+ compositeTopic.setForwardOnly(true);
+ compositeTopic.setForwardTo( Lists.newArrayList(forwardTo));
+
+ return compositeTopic;
+ }
+
protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception {
if (broker.getBrokerName().equals("localBroker")) {
restartLocalBroker(startNetworkConnector);
@@ -607,12 +731,15 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
if (flow.equals(FLOW.FORWARD)) {
broker2 = remoteBroker;
session2 = remoteSession;
+ remoteRuntimeBroker = (JavaRuntimeConfigurationBroker)
+ remoteBroker.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
} else {
broker1 = remoteBroker;
session1 = remoteSession;
}
}
+
protected BrokerService createLocalBroker(File dataDir, boolean startNetworkConnector) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setMonitorConnectionSplits(true);
@@ -622,6 +749,8 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
adapter.setDirectory(dataDir);
adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name());
brokerService.setPersistenceAdapter(adapter);
+ brokerService.setUseVirtualDestSubs(useVirtualDestSubs);
+ brokerService.setUseVirtualDestSubsOnCreation(useVirtualDestSubs);
if (startNetworkConnector) {
brokerService.addNetworkConnector(configureLocalNetworkConnector());
@@ -645,10 +774,11 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
connector.setDuplex(true);
connector.setStaticBridge(false);
connector.setSyncDurableSubs(true);
+ connector.setUseVirtualDestSubs(useVirtualDestSubs);
connector.setStaticallyIncludedDestinations(
Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(staticIncludeTopics + "?forceDurable=" + forceDurable)));
connector.setDynamicallyIncludedDestinations(
- Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(includedTopics)));
+ Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(includedTopics + "?forceDurable=" + forceDurable)));
connector.setExcludedDestinations(
Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(excludeTopicName)));
return connector;
@@ -665,6 +795,12 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
adapter.setDirectory(dataDir);
adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name());
brokerService.setPersistenceAdapter(adapter);
+ brokerService.setUseVirtualDestSubs(useVirtualDestSubs);
+ brokerService.setUseVirtualDestSubsOnCreation(useVirtualDestSubs);
+
+ if (useVirtualDestSubs) {
+ brokerService.setPlugins(new BrokerPlugin[] {new JavaRuntimeConfigurationPlugin()});
+ }
remoteAdvisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);