You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/06/24 00:29:47 UTC
svn commit: r787855 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/network/
test/java/org/apache/activemq/usecases/
Author: gtully
Date: Tue Jun 23 22:29:47 2009
New Revision: 787855
URL: http://svn.apache.org/viewvc?rev=787855&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2298 - respect priority when suppressing duplicate network subscriptions
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=787855&r1=787854&r2=787855&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Tue Jun 23 22:29:47 2009
@@ -559,10 +559,6 @@
// in a cyclic network there can be multiple bridges per broker that can propagate
// a network subscription so there is a need to synchronise on a shared entity
synchronized(brokerService.getVmConnectorURI()) {
- if (isDuplicateNetworkSubscription(info)) {
- // trace in method
- return;
- }
if (addConsumerInfo(info)) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
@@ -637,8 +633,8 @@
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
}
- subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
+ subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
}
}
@@ -951,50 +947,88 @@
}
protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
- boolean result = false;
+ boolean consumerAdded = false;
ConsumerInfo info = consumerInfo.copy();
addRemoteBrokerToBrokerPath(info);
DemandSubscription sub = createDemandSubscription(info);
if (sub != null) {
- addSubscription(sub);
- result = true;
+ if (duplicateSuppressionIsRequired(sub) ) {
+ undoMapRegistration(sub);
+ } else {
+ addSubscription(sub);
+ consumerAdded = true;
+ }
}
- return result;
+ return consumerAdded;
}
+ private void undoMapRegistration(DemandSubscription sub) {
+ subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
+ subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
+ }
+
/*
* check our existing subs networkConsumerIds against the list of network ids in this subscription
* A match means a duplicate which we suppress for topics and maybe for queues
*/
- private boolean isDuplicateNetworkSubscription(ConsumerInfo consumerInfo) {
- boolean isDuplicate = false;
+ private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
+ final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
+ boolean suppress = false;
if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()) {
- return isDuplicate;
- }
-
- List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
- if (candidateConsumers.isEmpty()) {
- candidateConsumers.add(consumerInfo.getConsumerId());
+ return suppress;
}
+ List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
Collection<Subscription> currentSubs =
getRegionSubscriptions(consumerInfo.getDestination().isTopic());
for (Subscription sub : currentSubs) {
List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
if (!networkConsumers.isEmpty()) {
if (matchFound(candidateConsumers, networkConsumers)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
- + ", sub: " + consumerInfo + " is duplicated by network subscription: "
- + sub.getConsumerInfo() + ", networkComsumerIds: " + networkConsumers);
- }
- isDuplicate = true;
+ suppress = hasLowerPriority(sub, candidate.getLocalInfo());
break;
}
}
}
- return isDuplicate;
+ return suppress;
+ }
+
+
+ private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
+ boolean suppress = false;
+
+ if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
+ + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: "
+ + existingSub.getConsumerInfo() + ", networkComsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds());
+ }
+ suppress = true;
+ } else {
+ // remove the existing lower priority duplicate and allow this candidate
+ try {
+ removeDuplicateSubscription(existingSub);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo()
+ + " with sub from " + remoteBrokerName
+ + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
+ + candidateInfo.getNetworkConsumerIds());
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: "+ existingSub, e);
+ }
+ }
+ return suppress;
+ }
+
+ private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
+ for (NetworkConnector connector: brokerService.getNetworkConnectors()) {
+ if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
+ break;
+ }
+ }
}
private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
@@ -1034,15 +1068,14 @@
if (configuration.isDecreaseNetworkConsumerPriority()) {
byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
- if (priority > Byte.MIN_VALUE && info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
- // The longer the path to the consumer, the less it's consumer
- // priority.
- priority -= info.getBrokerPath().length + 1;
- if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info);
- }
+ if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
+ // The longer the path to the consumer, the less it's consumer priority.
+ priority -= info.getBrokerPath().length + 1;
}
result.getLocalInfo().setPriority(priority);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info);
+ }
}
configureDemandSubscription(info, result);
return result;
@@ -1079,6 +1112,7 @@
sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
}
+
protected void removeDemandSubscription(ConsumerId id) throws IOException {
DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
if (LOG.isDebugEnabled()) {
@@ -1091,6 +1125,20 @@
}
}
}
+
+ protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
+ boolean removeDone = false;
+ DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
+ if (sub != null) {
+ try {
+ removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
+ removeDone = true;
+ } catch (IOException e) {
+ LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e);
+ }
+ }
+ return removeDone;
+ }
protected void waitStarted() throws InterruptedException {
startedLatch.await();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?rev=787855&r1=787854&r2=787855&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java Tue Jun 23 22:29:47 2009
@@ -16,13 +16,16 @@
*/
package org.apache.activemq.network;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.management.MBeanServer;
@@ -34,6 +37,7 @@
import org.apache.activemq.broker.jmx.NetworkBridgeView;
import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.JMXSupport;
@@ -67,7 +71,8 @@
private List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
private BrokerService brokerService;
private ObjectName objectName;
-
+ private ConcurrentLinkedQueue<DemandForwardingBridgeSupport> configuredBridges = new ConcurrentLinkedQueue<DemandForwardingBridgeSupport>();
+
public NetworkConnector() {
}
@@ -186,6 +191,7 @@
dest = (ActiveMQDestination[])topics.toArray(dest);
result.setDurableDestinations(dest);
}
+ configuredBridges.add(result);
return result;
}
@@ -271,4 +277,16 @@
+ JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge.getRemoteAddress())));
}
+ // ask all the bridges as we can't know to which this consumer is tied
+ public boolean removeDemandSubscription(ConsumerId consumerId) {
+ boolean removeSucceeded = false;
+ for (DemandForwardingBridgeSupport bridge: configuredBridges) {
+ if (bridge.removeDemandSubscriptionByLocalId(consumerId)) {
+ removeSucceeded = true;
+ break;
+ }
+ }
+ return removeSucceeded;
+ }
+
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java?rev=787855&r1=787854&r2=787855&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java Tue Jun 23 22:29:47 2009
@@ -22,15 +22,24 @@
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.util.MessageIdList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -322,7 +331,7 @@
// Send messages
sendMessages("BrokerB", dest, messageCount);
- assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
+ assertTrue("messaged received within time limit", messagesReceived.await(30, TimeUnit.SECONDS));
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
@@ -424,7 +433,7 @@
sendMessages("BrokerB", dest, messageCount);
// Let's try to wait for any messages.
- assertTrue(messagesReceived.await(60, TimeUnit.SECONDS));
+ assertTrue("messages are received within limit", messagesReceived.await(60, TimeUnit.SECONDS));
assertEquals(messageCount, msgs.getMessageCount());
}
@@ -451,8 +460,68 @@
verifyConsumerCount(broker, 1, dest);
}
}
+
+ public void testNoDuplicateQueueSubsHasLowestPriority() throws Exception {
+ boolean suppressQueueDuplicateSubscriptions = true;
+ boolean decreaseNetworkConsumerPriority = true;
+ bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions, decreaseNetworkConsumerPriority);
+
+ // Setup destination
+ final Destination dest = createDestination("TEST.FOO", false);
+
+ // delay the advisory messages so that one can percolate fully (cyclicly) before the other
+ BrokerItem brokerB = brokers.get("BrokerA");
+ brokerB.broker.setPlugins(new BrokerPlugin[]{new BrokerPlugin() {
+
+ public Broker installPlugin(Broker broker) throws Exception {
+ return new BrokerFilter(broker) {
+
+ final AtomicInteger count = new AtomicInteger();
+ @Override
+ public void preProcessDispatch(
+ MessageDispatch messageDispatch) {
+ if (messageDispatch.getDestination().getPhysicalName().contains("ActiveMQ.Advisory.Consumer")) {
+ // lets delay the first advisory
+ if (count.getAndIncrement() == 0) {
+ LOG.info("Sleeping on first advisory: " + messageDispatch);
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ super.postProcessDispatch(messageDispatch);
+ }
+
+ };
+ }}
+ });
+
+ startAllBrokers();
+
+
+ // Setup consumers
+ String brokerName = "BrokerA";
+ createConsumer(brokerName, dest);
+
+ // wait for advisories
+ Thread.sleep(5000);
+
+ // verify there is one consumer on each broker, no cycles
+ Collection<BrokerItem> brokerList = brokers.values();
+ for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
+ BrokerService broker = i.next().broker;
+ verifyConsumerCount(broker, 1, dest);
+ if (!brokerName.equals(broker.getBrokerName())) {
+ verifyConsumePriority(broker, ConsumerInfo.NETWORK_CONSUMER_PRIORITY, dest);
+ }
+ }
+ }
+
+
public void testDuplicateQueueSubs() throws Exception {
bridgeAllBrokers("default", 3, false);
@@ -477,16 +546,25 @@
BrokerService broker = i.next().broker;
if (!brokerName.equals(broker.getBrokerName())) {
verifyConsumerCount(broker, 2, dest);
+ verifyConsumePriority(broker, ConsumerInfo.NORMAL_PRIORITY, dest);
}
}
}
private void verifyConsumerCount(BrokerService broker, int count, Destination dest) throws Exception {
RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
- Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
+ Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
assertEquals("consumer count on " + broker.getBrokerName() + " matches for q: " + internalQueue, count, internalQueue.getConsumers().size());
}
+ private void verifyConsumePriority(BrokerService broker, byte expectedPriority, Destination dest) throws Exception {
+ RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
+ Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
+ for (Subscription consumer : internalQueue.getConsumers()) {
+ assertEquals("consumer on " + broker.getBrokerName() + " matches priority: " + internalQueue, expectedPriority, consumer.getConsumerInfo().getPriority());
+ }
+ }
+
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();