You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/06/24 00:29:47 UTC

svn commit: r787855 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Tue Jun 23 22:29:47 2009
New Revision: 787855

URL: http://svn.apache.org/viewvc?rev=787855&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2298 - respect priority when suppressing duplicate network subscriptions

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=787855&r1=787854&r2=787855&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Tue Jun 23 22:29:47 2009
@@ -559,10 +559,6 @@
             // in a cyclic network there can be multiple bridges per broker that can propagate
             // a network subscription so there is a need to synchronise on a shared entity
             synchronized(brokerService.getVmConnectorURI()) {
-                if (isDuplicateNetworkSubscription(info)) {
-                    // trace in method
-                    return;
-                }
                 if (addConsumerInfo(info)) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
@@ -637,8 +633,8 @@
             if (LOG.isDebugEnabled()) {
                 LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
             }
-            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
             localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
+            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
         }
     }
 
@@ -951,50 +947,88 @@
     }
 
     protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
-        boolean result = false;
+        boolean consumerAdded = false;
         ConsumerInfo info = consumerInfo.copy();
         addRemoteBrokerToBrokerPath(info);
         DemandSubscription sub = createDemandSubscription(info);
         if (sub != null) {
-            addSubscription(sub);
-            result = true;
+            if (duplicateSuppressionIsRequired(sub) ) {
+                undoMapRegistration(sub);
+            } else {
+                addSubscription(sub);
+                consumerAdded = true;
+            }
         }
-        return result;
+        return consumerAdded;
     }
     
+    private void undoMapRegistration(DemandSubscription sub) {
+        subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
+        subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());    
+    }
+
     /*
      * check our existing subs networkConsumerIds against the list of network ids in this subscription
      * A match means a duplicate which we suppress for topics and maybe for queues
      */
-    private boolean isDuplicateNetworkSubscription(ConsumerInfo consumerInfo) {
-        boolean isDuplicate = false;
+    private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
+        final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
+        boolean suppress = false;
         
         if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()) {
-            return isDuplicate;
-        }
-        
-        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
-        if (candidateConsumers.isEmpty()) {
-            candidateConsumers.add(consumerInfo.getConsumerId());
+            return suppress;
         }
         
+        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();        
         Collection<Subscription> currentSubs = 
             getRegionSubscriptions(consumerInfo.getDestination().isTopic());
         for (Subscription sub : currentSubs) {
             List<ConsumerId> networkConsumers =  sub.getConsumerInfo().getNetworkConsumerIds();
             if (!networkConsumers.isEmpty()) {
                 if (matchFound(candidateConsumers, networkConsumers)) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
-                                + ", sub: " + consumerInfo + " is duplicated by network subscription: " 
-                                + sub.getConsumerInfo()  + ", networkComsumerIds: " + networkConsumers);
-                    }
-                    isDuplicate = true;
+                    suppress = hasLowerPriority(sub, candidate.getLocalInfo());
                     break;
                 }
             }
         }
-        return isDuplicate;
+        return suppress;
+    }
+
+ 
+    private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
+        boolean suppress = false;
+        
+        if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
+                        + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: " 
+                        + existingSub.getConsumerInfo()  + ", networkComsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds());
+            }
+            suppress = true;
+        } else {
+            // remove the existing lower priority duplicate and allow this candidate
+            try {
+                removeDuplicateSubscription(existingSub);
+                
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo()
+                            + " with sub from " + remoteBrokerName
+                            + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: " 
+                            + candidateInfo.getNetworkConsumerIds());
+                }
+            } catch (IOException e) {
+                LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: "+ existingSub, e);
+            }
+        }
+        return suppress;
+    }
+
+    private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
+        for (NetworkConnector connector: brokerService.getNetworkConnectors()) {
+            if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
+                break;
+            }
+        }     
     }
 
     private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
@@ -1034,15 +1068,14 @@
 
         if (configuration.isDecreaseNetworkConsumerPriority()) {
             byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
-            if (priority > Byte.MIN_VALUE && info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
-                // The longer the path to the consumer, the less it's consumer
-                // priority.
-                priority -= info.getBrokerPath().length + 1;
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info);
-                }
+            if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
+                // The longer the path to the consumer, the less it's consumer priority.
+                priority -= info.getBrokerPath().length + 1; 
             }
             result.getLocalInfo().setPriority(priority);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info);
+            }
         }
         configureDemandSubscription(info, result);
         return result;
@@ -1079,6 +1112,7 @@
         sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
     }
 
+    
     protected void removeDemandSubscription(ConsumerId id) throws IOException {
         DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
         if (LOG.isDebugEnabled()) {
@@ -1091,6 +1125,20 @@
             }
         }
     }
+    
+    protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
+        boolean removeDone = false;
+        DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
+        if (sub != null) {
+            try {
+                removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
+                removeDone = true;   
+            } catch (IOException e) {
+                LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e);
+            }     
+        }
+        return removeDone;
+    }
 
     protected void waitStarted() throws InterruptedException {
         startedLatch.await();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?rev=787855&r1=787854&r2=787855&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java Tue Jun 23 22:29:47 2009
@@ -16,13 +16,16 @@
  */
 package org.apache.activemq.network;
 
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import javax.management.MBeanServer;
@@ -34,6 +37,7 @@
 import org.apache.activemq.broker.jmx.NetworkBridgeView;
 import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.util.JMXSupport;
@@ -67,7 +71,8 @@
     private List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
     private BrokerService brokerService;
     private ObjectName objectName;
-
+    private ConcurrentLinkedQueue<DemandForwardingBridgeSupport> configuredBridges = new ConcurrentLinkedQueue<DemandForwardingBridgeSupport>();
+ 
     public NetworkConnector() {
     }
 
@@ -186,6 +191,7 @@
             dest = (ActiveMQDestination[])topics.toArray(dest);
             result.setDurableDestinations(dest);
         }
+        configuredBridges.add(result);
         return result;
     }
 
@@ -271,4 +277,16 @@
                               + JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge.getRemoteAddress())));
     }
 
+    // ask all the bridges as we can't know to which this consumer is tied
+    public boolean removeDemandSubscription(ConsumerId consumerId) {
+        boolean removeSucceeded = false;
+        for (DemandForwardingBridgeSupport bridge: configuredBridges) {
+            if (bridge.removeDemandSubscriptionByLocalId(consumerId)) {
+                removeSucceeded = true;
+                break;
+            }
+        }
+        return removeSucceeded;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java?rev=787855&r1=787854&r2=787855&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java Tue Jun 23 22:29:47 2009
@@ -22,15 +22,24 @@
 import java.util.Iterator;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Destination;
 import javax.jms.MessageConsumer;
 
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.util.MessageIdList;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -322,7 +331,7 @@
         // Send messages
         sendMessages("BrokerB", dest, messageCount);
 
-        assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
+        assertTrue("messaged received within time limit", messagesReceived.await(30, TimeUnit.SECONDS));
         
         // Get message count
         MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
@@ -424,7 +433,7 @@
         sendMessages("BrokerB", dest, messageCount);
 
         // Let's try to wait for any messages.
-        assertTrue(messagesReceived.await(60, TimeUnit.SECONDS));
+        assertTrue("messages are received within limit", messagesReceived.await(60, TimeUnit.SECONDS));
         assertEquals(messageCount, msgs.getMessageCount());      
     }
 
@@ -451,8 +460,68 @@
             verifyConsumerCount(broker, 1, dest);
         }
     }
+
     
 
+    public void testNoDuplicateQueueSubsHasLowestPriority() throws Exception {
+        boolean suppressQueueDuplicateSubscriptions = true;
+        boolean decreaseNetworkConsumerPriority = true;
+        bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions, decreaseNetworkConsumerPriority);
+
+        // Setup destination
+        final Destination dest = createDestination("TEST.FOO", false);
+
+        // delay the advisory messages so that one can percolate fully (cyclicly) before the other
+        BrokerItem brokerB = brokers.get("BrokerA");
+        brokerB.broker.setPlugins(new BrokerPlugin[]{new BrokerPlugin() {
+
+            public Broker installPlugin(Broker broker) throws Exception {          
+                return new BrokerFilter(broker) {
+
+                    final AtomicInteger count = new AtomicInteger();
+                    @Override
+                    public void preProcessDispatch(
+                            MessageDispatch messageDispatch) {
+                        if (messageDispatch.getDestination().getPhysicalName().contains("ActiveMQ.Advisory.Consumer")) {
+                            // lets delay the first advisory
+                            if (count.getAndIncrement() == 0) {
+                                LOG.info("Sleeping on first advisory: " + messageDispatch);
+                                try {
+                                    Thread.sleep(2000);
+                                } catch (InterruptedException e) {
+                                    e.printStackTrace();
+                                }
+                            }
+                        }
+                        super.postProcessDispatch(messageDispatch);
+                    }
+                    
+                };
+            }}
+        });
+        
+        startAllBrokers();
+
+    
+        // Setup consumers
+        String brokerName = "BrokerA";
+        createConsumer(brokerName, dest);
+        
+        // wait for advisories
+        Thread.sleep(5000);
+        
+        // verify there is one consumer on each broker, no cycles
+        Collection<BrokerItem> brokerList = brokers.values();
+        for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
+            BrokerService broker = i.next().broker;
+            verifyConsumerCount(broker, 1, dest);
+            if (!brokerName.equals(broker.getBrokerName())) {
+                verifyConsumePriority(broker, ConsumerInfo.NETWORK_CONSUMER_PRIORITY, dest);
+            }
+        }
+    }
+
+
     public void testDuplicateQueueSubs() throws Exception {
         
         bridgeAllBrokers("default", 3, false);
@@ -477,16 +546,25 @@
             BrokerService broker = i.next().broker;
             if (!brokerName.equals(broker.getBrokerName())) {
                 verifyConsumerCount(broker, 2, dest);
+                verifyConsumePriority(broker, ConsumerInfo.NORMAL_PRIORITY, dest);
             }
         }
     }
 
     private void verifyConsumerCount(BrokerService broker, int count, Destination dest) throws Exception {
         RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
-        Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next(); 
+        Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
         assertEquals("consumer count on " + broker.getBrokerName() + " matches for q: " + internalQueue, count, internalQueue.getConsumers().size());      
     }
 
+    private void verifyConsumePriority(BrokerService broker, byte expectedPriority, Destination dest) throws Exception {
+        RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
+        Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
+        for (Subscription consumer : internalQueue.getConsumers()) {
+            assertEquals("consumer on " + broker.getBrokerName() + " matches priority: " + internalQueue, expectedPriority, consumer.getConsumerInfo().getPriority());      
+        }
+    }
+    
     public void setUp() throws Exception {
         super.setAutoFail(true);
         super.setUp();