You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/04/08 06:12:01 UTC

svn commit: r762925 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ test/java/org/apache/activemq/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Tue Apr  7 19:48:29 2009
New Revision: 762925

URL: http://svn.apache.org/viewvc?rev=762925&view=rev
Log:
implement AMQ-2198|https://issues.apache.org/activemq/browse/AMQ-2198 add setSuppressDuplicateQueueSubscriptions option to Network configuration, when true, duplicate subs for the same consumer will be suppressed, relevant when networkTTL > 1

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=762925&r1=762924&r2=762925&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Tue Apr  7 19:48:29 2009
@@ -940,27 +940,32 @@
     
     /*
      * check our existing subs networkConsumerIds against the list of network ids in this subscription
-     * a match means a duplicate which we suppress for topics
+     * A match means a duplicate which we suppress for topics and maybe for queues
      */
     private boolean isDuplicateNetworkSubscription(ConsumerInfo consumerInfo) {
         boolean isDuplicate = false;
-        if (consumerInfo.getDestination().isTopic()) {
-            List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
-            if (candidateConsumers.isEmpty()) {
-                candidateConsumers.add(consumerInfo.getConsumerId());
-            }
-            Collection<Subscription> currentSubs = getTopicRegionSubscriptions();
-            for (Subscription sub : currentSubs) {
-                List<ConsumerId> networkConsumers =  sub.getConsumerInfo().getNetworkConsumerIds();
-                if (!networkConsumers.isEmpty()) {
-                    if (matchFound(candidateConsumers, networkConsumers)) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("subscription: " + consumerInfo + " is duplicated by network subscription: " 
-                                    + sub.getConsumerInfo()  + ", networkComsumerIds: " + networkConsumers);
-                        }
-                        isDuplicate = true;
-                        break;
+        
+        if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()) {
+            return isDuplicate;
+        }
+        
+        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
+        if (candidateConsumers.isEmpty()) {
+            candidateConsumers.add(consumerInfo.getConsumerId());
+        }
+        
+        Collection<Subscription> currentSubs = 
+            getRegionSubscriptions(consumerInfo.getDestination().isTopic());
+        for (Subscription sub : currentSubs) {
+            List<ConsumerId> networkConsumers =  sub.getConsumerInfo().getNetworkConsumerIds();
+            if (!networkConsumers.isEmpty()) {
+                if (matchFound(candidateConsumers, networkConsumers)) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("subscription: " + consumerInfo + " is duplicated by network subscription: " 
+                                + sub.getConsumerInfo()  + ", networkComsumerIds: " + networkConsumers);
                     }
+                    isDuplicate = true;
+                    break;
                 }
             }
         }
@@ -978,12 +983,13 @@
         return found;
     }
 
-    private final Collection<Subscription> getTopicRegionSubscriptions() {
+    private final Collection<Subscription> getRegionSubscriptions(boolean isTopic) {
         RegionBroker region = (RegionBroker) brokerService.getRegionBroker();
-        AbstractRegion topicRegion = (AbstractRegion) region.getTopicRegion();
-        return topicRegion.getSubscriptions().values();
+        AbstractRegion abstractRegion = (AbstractRegion) 
+            (isTopic ? region.getTopicRegion() : region.getQueueRegion());
+        return abstractRegion.getSubscriptions().values();
     }
-
+    
     protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
         //add our original id to ourselves
         info.addNetworkConsumerId(info.getConsumerId());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=762925&r1=762924&r2=762925&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Tue Apr  7 19:48:29 2009
@@ -36,6 +36,7 @@
     private String password;
     private String destinationFilter = ">";
     private String name = null;
+    private boolean suppressDuplicateQueueSubscriptions = false;
 
     /**
      * @return the conduitSubscriptions
@@ -222,4 +223,16 @@
     public void setName(String name) {
         this.name = name;
     }
+
+    public boolean isSuppressDuplicateQueueSubscriptions() {
+        return suppressDuplicateQueueSubscriptions;
+    }
+    
+    /**
+     * 
+     * @param val if true, duplicate network queue subscriptions (in a cyclic network) will be suppressed
+     */
+    public void setSuppressDuplicateQueueSubscriptions(boolean val) {
+        suppressDuplicateQueueSubscriptions = val;
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=762925&r1=762924&r2=762925&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Tue Apr  7 19:48:29 2009
@@ -115,10 +115,10 @@
 
     // This will interconnect all brokes using multicast
     protected void bridgeAllBrokers() throws Exception {
-        bridgeAllBrokers("default");
+        bridgeAllBrokers("default", 1, false);
     }
 
-    protected void bridgeAllBrokers(String groupName) throws Exception {
+    protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs) throws Exception {
         Collection<BrokerItem> brokerList = brokers.values();
         for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
             BrokerService broker = i.next().broker;
@@ -131,7 +131,9 @@
 
             TransportConnector transport = (TransportConnector)transportConnectors.get(0);
             transport.setDiscoveryUri(new URI("multicast://default?group=" + groupName));
-            broker.addNetworkConnector("multicast://default?group=" + groupName);
+            NetworkConnector nc = broker.addNetworkConnector("multicast://default?group=" + groupName);
+            nc.setNetworkTTL(ttl);
+            nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs);
         }
 
         // Multicasting may take longer to setup

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java?rev=762925&r1=762924&r2=762925&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java Tue Apr  7 19:48:29 2009
@@ -17,12 +17,18 @@
 package org.apache.activemq.usecases;
 
 import java.net.URI;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 
 import javax.jms.Destination;
 import javax.jms.MessageConsumer;
 
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.util.MessageIdList;
 
 /**
@@ -270,6 +276,65 @@
         assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
     }
 
+    public void testNoDuplicateQueueSubs() throws Exception {
+        
+        bridgeAllBrokers("default", 3, true);
+        
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Setup consumers
+        String brokerName = "BrokerA";
+        createConsumer(brokerName, dest);
+        
+        // wait for advisories
+        Thread.sleep(2000);
+        
+        // verify there is one consumer on each broker, no cycles
+        Collection<BrokerItem> brokerList = brokers.values();
+        for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
+            BrokerService broker = i.next().broker;
+            verifyConsumerCount(broker, 1, dest);
+        }
+    }
+    
+
+    public void testDuplicateQueueSubs() throws Exception {
+        
+        bridgeAllBrokers("default", 3, false);
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Setup consumers
+        String brokerName = "BrokerA";
+        createConsumer(brokerName, dest);
+        
+        // wait for advisories
+        Thread.sleep(2000);
+        
+        verifyConsumerCount(brokers.get(brokerName).broker, 1, dest);
+        
+        // in a cyclic network, other brokers will get second order consumer
+        // an alternative route to A via each other
+        Collection<BrokerItem> brokerList = brokers.values();
+        for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
+            BrokerService broker = i.next().broker;
+            if (!brokerName.equals(broker.getBrokerName())) {
+                verifyConsumerCount(broker, 2, dest);
+            }
+        }
+    }
+
+    private void verifyConsumerCount(BrokerService broker, int count, Destination dest) throws Exception {
+        RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
+        Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next(); 
+        assertEquals("consumer count on " + broker.getBrokerName() + " matches for q: " + internalQueue, count, internalQueue.getConsumers().size());      
+    }
+
     public void setUp() throws Exception {
         super.setAutoFail(true);
         super.setUp();