You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/03/09 20:55:18 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6204

Repository: activemq
Updated Branches:
  refs/heads/master 8b23e072e -> a2781e396


https://issues.apache.org/jira/browse/AMQ-6204

Fixing the removal logic on virtual destination remove inside of
Advisory Broker to clean up virtual destination maps properly.  Added a
test to verify.  Also added new debug logging to help track down any
future issues.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a2781e39
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a2781e39
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a2781e39

Branch: refs/heads/master
Commit: a2781e3966ded41a241d24ffb8d85d410c39eb21
Parents: 8b23e07
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed Mar 9 19:52:01 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Wed Mar 9 19:54:08 2016 +0000

----------------------------------------------------------------------
 .../activemq/advisory/AdvisoryBroker.java       | 64 +++++++++++++-------
 .../region/virtual/FilteredDestination.java     | 35 ++++++++++-
 .../network/VirtualConsumerDemandTest.java      | 55 ++++++++++++++++-
 3 files changed, 128 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a2781e39/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 564321f..5ac201e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -289,11 +289,10 @@ public class AdvisoryBroker extends BrokerFilter {
                         //in case of multiple matches
                         VirtualConsumerPair key = new VirtualConsumerPair(virtualDestination, destination);
                         ConsumerInfo i = brokerConsumerDests.get(key);
-                        if (consumerInfo.equals(i)) {
-                            if (brokerConsumerDests.remove(key) != null) {
-                                fireVirtualDestinationRemoveAdvisory(context, consumerInfo);
-                                break;
-                            }
+                        if (consumerInfo.equals(i) && brokerConsumerDests.remove(key) != null) {
+                            LOG.debug("Virtual consumer pair removed: {} for consumer: {} ", key, i);
+                            fireVirtualDestinationRemoveAdvisory(context, consumerInfo);
+                            break;
                         }
                     }
                 }
@@ -549,6 +548,7 @@ public class AdvisoryBroker extends BrokerFilter {
         super.virtualDestinationAdded(context, virtualDestination);
 
         if (virtualDestinations.add(virtualDestination)) {
+            LOG.debug("Virtual destination added: {}", virtualDestination);
             try {
                 // Don't advise advisory topics.
                 if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) {
@@ -592,20 +592,25 @@ public class AdvisoryBroker extends BrokerFilter {
         //if no consumer info, we need to create one - this is the case when an advisory is fired
         //because of the existence of a destination matching a virtual destination
         if (info == null) {
-            ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
-            SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId());
-            ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
-
-            info = new ConsumerInfo(consumerId);
 
             //store the virtual destination and the activeMQDestination as a pair so that we can keep track
             //of all matching forwarded destinations that caused demand
-            if(brokerConsumerDests.putIfAbsent(new VirtualConsumerPair(virtualDestination, activeMQDest), info) == null) {
-                info.setDestination(virtualDestination.getVirtualDestination());
-                ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
-
-                if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
-                    fireConsumerAdvisory(context, info.getDestination(), topic, info);
+            VirtualConsumerPair pair = new VirtualConsumerPair(virtualDestination, activeMQDest);
+            if (brokerConsumerDests.get(pair) == null) {
+                ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
+                SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId());
+                ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+                info = new ConsumerInfo(consumerId);
+
+                if(brokerConsumerDests.putIfAbsent(pair, info) == null) {
+                    LOG.debug("Virtual consumer pair added: {} for consumer: {} ", pair, info);
+                    info.setDestination(virtualDestination.getVirtualDestination());
+                    ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
+
+                    if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
+                        LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination);
+                        fireConsumerAdvisory(context, info.getDestination(), topic, info);
+                    }
                 }
             }
         //this is the case of a real consumer coming online
@@ -615,6 +620,7 @@ public class AdvisoryBroker extends BrokerFilter {
             ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
 
             if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
+                LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination);
                 fireConsumerAdvisory(context, info.getDestination(), topic, info);
             }
         }
@@ -626,6 +632,7 @@ public class AdvisoryBroker extends BrokerFilter {
         super.virtualDestinationRemoved(context, virtualDestination);
 
         if (virtualDestinations.remove(virtualDestination)) {
+            LOG.debug("Virtual destination removed: {}", virtualDestination);
             try {
                 consumersLock.readLock().lock();
                 try {
@@ -636,16 +643,17 @@ public class AdvisoryBroker extends BrokerFilter {
                                 //find all consumers for this virtual destination
                                 if (virtualDestinationConsumers.get(info).equals(virtualDestination)) {
                                     fireVirtualDestinationRemoveAdvisory(context, info);
-                                }
 
-                                //check consumers created for the existence of a destination to see if they
-                                //match the consumerinfo and clean up
-                                for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) {
-                                    ConsumerInfo i = brokerConsumerDests.get(activeMQDest);
-                                    if (info.equals(i)) {
-                                        brokerConsumerDests.remove(activeMQDest);
+                                    //check consumers created for the existence of a destination to see if they
+                                    //match the consumerinfo and clean up
+                                    for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) {
+                                        ConsumerInfo i = brokerConsumerDests.get(activeMQDest);
+                                        if (info.equals(i) && brokerConsumerDests.remove(activeMQDest) != null) {
+                                            LOG.debug("Virtual consumer pair removed: {} for consumer: {} ", activeMQDest, i);
+                                        }
                                     }
                                 }
+
                             }
                         }
                     }
@@ -663,6 +671,7 @@ public class AdvisoryBroker extends BrokerFilter {
 
         VirtualDestination virtualDestination = virtualDestinationConsumers.remove(info);
         if (virtualDestination != null) {
+            LOG.debug("Virtual consumer removed: {}, for virtual destination: {}", info, virtualDestination);
             ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(virtualDestination.getVirtualDestination());
 
             ActiveMQDestination dest = info.getDestination();
@@ -898,6 +907,7 @@ public class AdvisoryBroker extends BrokerFilter {
             this.virtualDestination = virtualDestination;
             this.activeMQDestination = activeMQDestination;
         }
+
         @Override
         public int hashCode() {
             final int prime = 31;
@@ -913,6 +923,7 @@ public class AdvisoryBroker extends BrokerFilter {
                             .hashCode());
             return result;
         }
+
         @Override
         public boolean equals(Object obj) {
             if (this == obj)
@@ -936,6 +947,13 @@ public class AdvisoryBroker extends BrokerFilter {
                 return false;
             return true;
         }
+
+        @Override
+        public String toString() {
+            return "VirtualConsumerPair [virtualDestination=" + virtualDestination + ", activeMQDestination="
+                    + activeMQDestination + "]";
+        }
+
         private AdvisoryBroker getOuterType() {
             return AdvisoryBroker.this;
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a2781e39/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java
index dc81a0e..73cb548 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java
@@ -30,10 +30,10 @@ import org.apache.activemq.selector.SelectorParser;
  *
  * @org.apache.xbean.XBean
  *
- * 
+ *
  */
 public class FilteredDestination {
-    
+
     private ActiveMQDestination destination;
     private String selector;
     private BooleanExpression filter;
@@ -91,4 +91,35 @@ public class FilteredDestination {
     public void setTopic(String topic) {
         setDestination(ActiveMQDestination.createDestination(topic, ActiveMQDestination.TOPIC_TYPE));
     }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((destination == null) ? 0 : destination.hashCode());
+        result = prime * result + ((selector == null) ? 0 : selector.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        FilteredDestination other = (FilteredDestination) obj;
+        if (destination == null) {
+            if (other.destination != null)
+                return false;
+        } else if (!destination.equals(other.destination))
+            return false;
+        if (selector == null) {
+            if (other.selector != null)
+                return false;
+        } else if (!selector.equals(other.selector))
+            return false;
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a2781e39/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
index bff069b..3b46f8c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
@@ -545,6 +545,59 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
     }
 
     /**
+     * This tests that having 2 composite destinations (1 included for dynamic flow and 1 not)
+     * will allow messages to flow and that deleting 1 destination dosen't clear out the virtual
+     * consumer map except for what should be cleared.
+     *
+     */
+    @Test(timeout = 60 * 1000)
+    public void testTwoCompositeTopicsRemove1() throws Exception {
+        Assume.assumeTrue(isUseVirtualDestSubsOnCreation);
+
+        doSetUp(true, null);
+
+        //configure a virtual destination that forwards messages from topic testQueueName
+        //to queue "include.test.bar.bridge" and "include.test.bar.bridge2"
+        CompositeTopic compositeTopic1 = createCompositeTopic(testTopicName,
+                new ActiveMQQueue("include.test.bar.bridge"));
+        CompositeTopic compositeTopic2 = createCompositeTopic(testTopicName + 2,
+                new ActiveMQQueue("include.test.bar.bridge2"));
+
+        runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic1, compositeTopic2}, true);
+
+        MessageProducer includedProducer = localSession.createProducer(included);
+        Message test = localSession.createTextMessage("test");
+        Thread.sleep(1000);
+
+        final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics();
+        final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination(
+                new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
+
+        waitForConsumerCount(destinationStatistics, 1);
+
+        includedProducer.send(test);
+
+        waitForDispatchFromLocalBroker(destinationStatistics, 1);
+        assertLocalBrokerStatistics(destinationStatistics, 1);
+        assertEquals("remote dest messages", 1, remoteDestStatistics.getMessages().getCount());
+
+        //verify there are 2 virtual destinations but only 1 consumer and broker dest
+        assertAdvisoryBrokerCounts(2,1,1);
+        runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic1}, true);
+        Thread.sleep(2000);
+        //verify there is is only 1 virtual dest after deletion
+        assertAdvisoryBrokerCounts(1,1,1);
+
+        includedProducer.send(test);
+
+        //make sure messages are still forwarded even after 1 composite topic was deleted
+        waitForDispatchFromLocalBroker(destinationStatistics, 2);
+        assertLocalBrokerStatistics(destinationStatistics, 2);
+        assertEquals("remote dest messages", 2, remoteDestStatistics.getMessages().getCount());
+
+    }
+
+    /**
      * Test that demand is destroyed after removing both targets from the composite Topic
      * @throws Exception
      */
@@ -1375,7 +1428,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
         ActiveMQMessage message = null;
         while ((message = (ActiveMQMessage) advisoryConsumer.receive(1000)) != null) {
             available++;
-            LOG.debug("advisory data structure: {}", message.getDataStructure());
+            LOG.info("advisory data structure: {}", message.getDataStructure());
         }
         assertEquals(count, available);
     }