You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/07/17 14:43:59 UTC
svn commit: r795069 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/network/ConduitBridge.java
test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
Author: dejanb
Date: Fri Jul 17 12:43:59 2009
New Revision: 795069
URL: http://svn.apache.org/viewvc?rev=795069&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2327 - close proxy consumers
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=795069&r1=795068&r2=795069&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java Fri Jul 17 12:43:59 2009
@@ -18,9 +18,11 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.filter.DestinationFilter;
@@ -55,6 +57,18 @@
info.setSelector(null);
return doCreateDemandSubscription(info);
}
+
+ protected boolean checkPaths(BrokerId[] first, BrokerId[] second) {
+ if (first == null || second == null)
+ return true;
+ if (Arrays.equals(first, second))
+ return true;
+ if (first[0].equals(second[0])
+ && first[first.length - 1].equals(second[second.length - 1]))
+ return false;
+ else
+ return true;
+ }
protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) {
// search through existing subscriptions and see if we have a match
@@ -62,6 +76,7 @@
DestinationFilter filter = DestinationFilter.parseFilter(info.getDestination());
for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
DemandSubscription ds = (DemandSubscription)i.next();
+
if (filter.matches(ds.getLocalInfo().getDestination())) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " + ds.getRemoteInfo()
@@ -69,9 +84,10 @@
}
// add the interest in the subscription
// ds.add(ds.getRemoteInfo().getConsumerId());
- ds.add(info.getConsumerId());
+ if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath())) {
+ ds.add(info.getConsumerId());
+ }
matched = true;
-
// continue - we want interest to any existing
// DemandSubscriptions
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java?rev=795069&r1=795068&r2=795069&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java Fri Jul 17 12:43:59 2009
@@ -553,6 +553,8 @@
public void testDuplicateQueueSubs() throws Exception {
+
+ createBroker("BrokerD");
bridgeAllBrokers("default", 3, false);
startAllBrokers();
@@ -575,7 +577,7 @@
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
if (!brokerName.equals(broker.getBrokerName())) {
- verifyConsumerCount(broker, 2, dest);
+ verifyConsumerCount(broker, 3, dest);
verifyConsumePriority(broker, ConsumerInfo.NORMAL_PRIORITY, dest);
}
}