You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/07/17 14:43:59 UTC

svn commit: r795069 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ConduitBridge.java test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java

Author: dejanb
Date: Fri Jul 17 12:43:59 2009
New Revision: 795069

URL: http://svn.apache.org/viewvc?rev=795069&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2327 - close proxy consumers

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=795069&r1=795068&r2=795069&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java Fri Jul 17 12:43:59 2009
@@ -18,9 +18,11 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.filter.DestinationFilter;
@@ -55,6 +57,18 @@
         info.setSelector(null);
         return doCreateDemandSubscription(info);
     }
+    
+    protected boolean checkPaths(BrokerId[] first, BrokerId[] second) {
+    	if (first == null || second == null)
+			return true;
+		if (Arrays.equals(first, second))
+			return true;
+		if (first[0].equals(second[0])
+				&& first[first.length - 1].equals(second[second.length - 1]))
+			return false;
+		else
+			return true;
+    }
 
     protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) {
         // search through existing subscriptions and see if we have a match
@@ -62,6 +76,7 @@
         DestinationFilter filter = DestinationFilter.parseFilter(info.getDestination());
         for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
             DemandSubscription ds = (DemandSubscription)i.next();
+            
             if (filter.matches(ds.getLocalInfo().getDestination())) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " + ds.getRemoteInfo()
@@ -69,9 +84,10 @@
                 }
                 // add the interest in the subscription
                 // ds.add(ds.getRemoteInfo().getConsumerId());
-                ds.add(info.getConsumerId());
+                if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath())) {
+                	ds.add(info.getConsumerId());
+                }
                 matched = true;
-                
                 // continue - we want interest to any existing
                 // DemandSubscriptions
             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java?rev=795069&r1=795068&r2=795069&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java Fri Jul 17 12:43:59 2009
@@ -553,6 +553,8 @@
 
 
     public void testDuplicateQueueSubs() throws Exception {
+    	
+    	createBroker("BrokerD");
         
         bridgeAllBrokers("default", 3, false);
         startAllBrokers();
@@ -575,7 +577,7 @@
         for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
             BrokerService broker = i.next().broker;
             if (!brokerName.equals(broker.getBrokerName())) {
-                verifyConsumerCount(broker, 2, dest);
+                verifyConsumerCount(broker, 3, dest);
                 verifyConsumePriority(broker, ConsumerInfo.NORMAL_PRIORITY, dest);
             }
         }