You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/04/08 06:12:01 UTC
svn commit: r762925 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/network/ test/java/org/apache/activemq/
test/java/org/apache/activemq/usecases/
Author: gtully
Date: Tue Apr 7 19:48:29 2009
New Revision: 762925
URL: http://svn.apache.org/viewvc?rev=762925&view=rev
Log:
implement AMQ-2198|https://issues.apache.org/activemq/browse/AMQ-2198 add setSuppressDuplicateQueueSubscriptions option to Network configuration, when true, duplicate subs for the same consumer will be suppressed, relevant when networkTTL > 1
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=762925&r1=762924&r2=762925&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Tue Apr 7 19:48:29 2009
@@ -940,27 +940,32 @@
/*
* check our existing subs networkConsumerIds against the list of network ids in this subscription
- * a match means a duplicate which we suppress for topics
+ * A match means a duplicate which we suppress for topics and maybe for queues
*/
private boolean isDuplicateNetworkSubscription(ConsumerInfo consumerInfo) {
boolean isDuplicate = false;
- if (consumerInfo.getDestination().isTopic()) {
- List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
- if (candidateConsumers.isEmpty()) {
- candidateConsumers.add(consumerInfo.getConsumerId());
- }
- Collection<Subscription> currentSubs = getTopicRegionSubscriptions();
- for (Subscription sub : currentSubs) {
- List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
- if (!networkConsumers.isEmpty()) {
- if (matchFound(candidateConsumers, networkConsumers)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("subscription: " + consumerInfo + " is duplicated by network subscription: "
- + sub.getConsumerInfo() + ", networkComsumerIds: " + networkConsumers);
- }
- isDuplicate = true;
- break;
+
+ if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()) {
+ return isDuplicate;
+ }
+
+ List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
+ if (candidateConsumers.isEmpty()) {
+ candidateConsumers.add(consumerInfo.getConsumerId());
+ }
+
+ Collection<Subscription> currentSubs =
+ getRegionSubscriptions(consumerInfo.getDestination().isTopic());
+ for (Subscription sub : currentSubs) {
+ List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
+ if (!networkConsumers.isEmpty()) {
+ if (matchFound(candidateConsumers, networkConsumers)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("subscription: " + consumerInfo + " is duplicated by network subscription: "
+ + sub.getConsumerInfo() + ", networkComsumerIds: " + networkConsumers);
}
+ isDuplicate = true;
+ break;
}
}
}
@@ -978,12 +983,13 @@
return found;
}
- private final Collection<Subscription> getTopicRegionSubscriptions() {
+ private final Collection<Subscription> getRegionSubscriptions(boolean isTopic) {
RegionBroker region = (RegionBroker) brokerService.getRegionBroker();
- AbstractRegion topicRegion = (AbstractRegion) region.getTopicRegion();
- return topicRegion.getSubscriptions().values();
+ AbstractRegion abstractRegion = (AbstractRegion)
+ (isTopic ? region.getTopicRegion() : region.getQueueRegion());
+ return abstractRegion.getSubscriptions().values();
}
-
+
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
//add our original id to ourselves
info.addNetworkConsumerId(info.getConsumerId());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=762925&r1=762924&r2=762925&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Tue Apr 7 19:48:29 2009
@@ -36,6 +36,7 @@
private String password;
private String destinationFilter = ">";
private String name = null;
+ private boolean suppressDuplicateQueueSubscriptions = false;
/**
* @return the conduitSubscriptions
@@ -222,4 +223,16 @@
public void setName(String name) {
this.name = name;
}
+
+ public boolean isSuppressDuplicateQueueSubscriptions() {
+ return suppressDuplicateQueueSubscriptions;
+ }
+
+ /**
+ *
+ * @param val if true, duplicate network queue subscriptions (in a cyclic network) will be suppressed
+ */
+ public void setSuppressDuplicateQueueSubscriptions(boolean val) {
+ suppressDuplicateQueueSubscriptions = val;
+ }
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=762925&r1=762924&r2=762925&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Tue Apr 7 19:48:29 2009
@@ -115,10 +115,10 @@
// This will interconnect all brokes using multicast
protected void bridgeAllBrokers() throws Exception {
- bridgeAllBrokers("default");
+ bridgeAllBrokers("default", 1, false);
}
- protected void bridgeAllBrokers(String groupName) throws Exception {
+ protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs) throws Exception {
Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
@@ -131,7 +131,9 @@
TransportConnector transport = (TransportConnector)transportConnectors.get(0);
transport.setDiscoveryUri(new URI("multicast://default?group=" + groupName));
- broker.addNetworkConnector("multicast://default?group=" + groupName);
+ NetworkConnector nc = broker.addNetworkConnector("multicast://default?group=" + groupName);
+ nc.setNetworkTTL(ttl);
+ nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs);
}
// Multicasting may take longer to setup
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java?rev=762925&r1=762924&r2=762925&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java Tue Apr 7 19:48:29 2009
@@ -17,12 +17,18 @@
package org.apache.activemq.usecases;
import java.net.URI;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.MessageIdList;
/**
@@ -270,6 +276,65 @@
assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
}
+ public void testNoDuplicateQueueSubs() throws Exception {
+
+ bridgeAllBrokers("default", 3, true);
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", false);
+
+ // Setup consumers
+ String brokerName = "BrokerA";
+ createConsumer(brokerName, dest);
+
+ // wait for advisories
+ Thread.sleep(2000);
+
+ // verify there is one consumer on each broker, no cycles
+ Collection<BrokerItem> brokerList = brokers.values();
+ for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
+ BrokerService broker = i.next().broker;
+ verifyConsumerCount(broker, 1, dest);
+ }
+ }
+
+
+ public void testDuplicateQueueSubs() throws Exception {
+
+ bridgeAllBrokers("default", 3, false);
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", false);
+
+ // Setup consumers
+ String brokerName = "BrokerA";
+ createConsumer(brokerName, dest);
+
+ // wait for advisories
+ Thread.sleep(2000);
+
+ verifyConsumerCount(brokers.get(brokerName).broker, 1, dest);
+
+ // in a cyclic network, other brokers will get second order consumer
+ // an alternative route to A via each other
+ Collection<BrokerItem> brokerList = brokers.values();
+ for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
+ BrokerService broker = i.next().broker;
+ if (!brokerName.equals(broker.getBrokerName())) {
+ verifyConsumerCount(broker, 2, dest);
+ }
+ }
+ }
+
+ private void verifyConsumerCount(BrokerService broker, int count, Destination dest) throws Exception {
+ RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
+ Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
+ assertEquals("consumer count on " + broker.getBrokerName() + " matches for q: " + internalQueue, count, internalQueue.getConsumers().size());
+ }
+
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();