You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/03/09 20:55:18 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6204
Repository: activemq
Updated Branches:
refs/heads/master 8b23e072e -> a2781e396
https://issues.apache.org/jira/browse/AMQ-6204
Fixing the removal logic on virtual destination remove inside of
Advisory Broker to clean up virtual destination maps properly. Added a
test to verify. Also added new debug logging to help track down any
future issues.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a2781e39
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a2781e39
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a2781e39
Branch: refs/heads/master
Commit: a2781e3966ded41a241d24ffb8d85d410c39eb21
Parents: 8b23e07
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed Mar 9 19:52:01 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Wed Mar 9 19:54:08 2016 +0000
----------------------------------------------------------------------
.../activemq/advisory/AdvisoryBroker.java | 64 +++++++++++++-------
.../region/virtual/FilteredDestination.java | 35 ++++++++++-
.../network/VirtualConsumerDemandTest.java | 55 ++++++++++++++++-
3 files changed, 128 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/a2781e39/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 564321f..5ac201e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -289,11 +289,10 @@ public class AdvisoryBroker extends BrokerFilter {
//in case of multiple matches
VirtualConsumerPair key = new VirtualConsumerPair(virtualDestination, destination);
ConsumerInfo i = brokerConsumerDests.get(key);
- if (consumerInfo.equals(i)) {
- if (brokerConsumerDests.remove(key) != null) {
- fireVirtualDestinationRemoveAdvisory(context, consumerInfo);
- break;
- }
+ if (consumerInfo.equals(i) && brokerConsumerDests.remove(key) != null) {
+ LOG.debug("Virtual consumer pair removed: {} for consumer: {} ", key, i);
+ fireVirtualDestinationRemoveAdvisory(context, consumerInfo);
+ break;
}
}
}
@@ -549,6 +548,7 @@ public class AdvisoryBroker extends BrokerFilter {
super.virtualDestinationAdded(context, virtualDestination);
if (virtualDestinations.add(virtualDestination)) {
+ LOG.debug("Virtual destination added: {}", virtualDestination);
try {
// Don't advise advisory topics.
if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) {
@@ -592,20 +592,25 @@ public class AdvisoryBroker extends BrokerFilter {
//if no consumer info, we need to create one - this is the case when an advisory is fired
//because of the existence of a destination matching a virtual destination
if (info == null) {
- ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
- SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId());
- ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
-
- info = new ConsumerInfo(consumerId);
//store the virtual destination and the activeMQDestination as a pair so that we can keep track
//of all matching forwarded destinations that caused demand
- if(brokerConsumerDests.putIfAbsent(new VirtualConsumerPair(virtualDestination, activeMQDest), info) == null) {
- info.setDestination(virtualDestination.getVirtualDestination());
- ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
-
- if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
- fireConsumerAdvisory(context, info.getDestination(), topic, info);
+ VirtualConsumerPair pair = new VirtualConsumerPair(virtualDestination, activeMQDest);
+ if (brokerConsumerDests.get(pair) == null) {
+ ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
+ SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId());
+ ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+ info = new ConsumerInfo(consumerId);
+
+ if(brokerConsumerDests.putIfAbsent(pair, info) == null) {
+ LOG.debug("Virtual consumer pair added: {} for consumer: {} ", pair, info);
+ info.setDestination(virtualDestination.getVirtualDestination());
+ ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
+
+ if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
+ LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination);
+ fireConsumerAdvisory(context, info.getDestination(), topic, info);
+ }
}
}
//this is the case of a real consumer coming online
@@ -615,6 +620,7 @@ public class AdvisoryBroker extends BrokerFilter {
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
+ LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination);
fireConsumerAdvisory(context, info.getDestination(), topic, info);
}
}
@@ -626,6 +632,7 @@ public class AdvisoryBroker extends BrokerFilter {
super.virtualDestinationRemoved(context, virtualDestination);
if (virtualDestinations.remove(virtualDestination)) {
+ LOG.debug("Virtual destination removed: {}", virtualDestination);
try {
consumersLock.readLock().lock();
try {
@@ -636,16 +643,17 @@ public class AdvisoryBroker extends BrokerFilter {
//find all consumers for this virtual destination
if (virtualDestinationConsumers.get(info).equals(virtualDestination)) {
fireVirtualDestinationRemoveAdvisory(context, info);
- }
- //check consumers created for the existence of a destination to see if they
- //match the consumerinfo and clean up
- for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) {
- ConsumerInfo i = brokerConsumerDests.get(activeMQDest);
- if (info.equals(i)) {
- brokerConsumerDests.remove(activeMQDest);
+ //check consumers created for the existence of a destination to see if they
+ //match the consumerinfo and clean up
+ for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) {
+ ConsumerInfo i = brokerConsumerDests.get(activeMQDest);
+ if (info.equals(i) && brokerConsumerDests.remove(activeMQDest) != null) {
+ LOG.debug("Virtual consumer pair removed: {} for consumer: {} ", activeMQDest, i);
+ }
}
}
+
}
}
}
@@ -663,6 +671,7 @@ public class AdvisoryBroker extends BrokerFilter {
VirtualDestination virtualDestination = virtualDestinationConsumers.remove(info);
if (virtualDestination != null) {
+ LOG.debug("Virtual consumer removed: {}, for virtual destination: {}", info, virtualDestination);
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(virtualDestination.getVirtualDestination());
ActiveMQDestination dest = info.getDestination();
@@ -898,6 +907,7 @@ public class AdvisoryBroker extends BrokerFilter {
this.virtualDestination = virtualDestination;
this.activeMQDestination = activeMQDestination;
}
+
@Override
public int hashCode() {
final int prime = 31;
@@ -913,6 +923,7 @@ public class AdvisoryBroker extends BrokerFilter {
.hashCode());
return result;
}
+
@Override
public boolean equals(Object obj) {
if (this == obj)
@@ -936,6 +947,13 @@ public class AdvisoryBroker extends BrokerFilter {
return false;
return true;
}
+
+ @Override
+ public String toString() {
+ return "VirtualConsumerPair [virtualDestination=" + virtualDestination + ", activeMQDestination="
+ + activeMQDestination + "]";
+ }
+
private AdvisoryBroker getOuterType() {
return AdvisoryBroker.this;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/a2781e39/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java
index dc81a0e..73cb548 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java
@@ -30,10 +30,10 @@ import org.apache.activemq.selector.SelectorParser;
*
* @org.apache.xbean.XBean
*
- *
+ *
*/
public class FilteredDestination {
-
+
private ActiveMQDestination destination;
private String selector;
private BooleanExpression filter;
@@ -91,4 +91,35 @@ public class FilteredDestination {
public void setTopic(String topic) {
setDestination(ActiveMQDestination.createDestination(topic, ActiveMQDestination.TOPIC_TYPE));
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((destination == null) ? 0 : destination.hashCode());
+ result = prime * result + ((selector == null) ? 0 : selector.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ FilteredDestination other = (FilteredDestination) obj;
+ if (destination == null) {
+ if (other.destination != null)
+ return false;
+ } else if (!destination.equals(other.destination))
+ return false;
+ if (selector == null) {
+ if (other.selector != null)
+ return false;
+ } else if (!selector.equals(other.selector))
+ return false;
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/a2781e39/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
index bff069b..3b46f8c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
@@ -545,6 +545,59 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
}
/**
+ * This tests that having 2 composite destinations (1 included for dynamic flow and 1 not)
+ * will allow messages to flow and that deleting 1 destination dosen't clear out the virtual
+ * consumer map except for what should be cleared.
+ *
+ */
+ @Test(timeout = 60 * 1000)
+ public void testTwoCompositeTopicsRemove1() throws Exception {
+ Assume.assumeTrue(isUseVirtualDestSubsOnCreation);
+
+ doSetUp(true, null);
+
+ //configure a virtual destination that forwards messages from topic testQueueName
+ //to queue "include.test.bar.bridge" and "include.test.bar.bridge2"
+ CompositeTopic compositeTopic1 = createCompositeTopic(testTopicName,
+ new ActiveMQQueue("include.test.bar.bridge"));
+ CompositeTopic compositeTopic2 = createCompositeTopic(testTopicName + 2,
+ new ActiveMQQueue("include.test.bar.bridge2"));
+
+ runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic1, compositeTopic2}, true);
+
+ MessageProducer includedProducer = localSession.createProducer(included);
+ Message test = localSession.createTextMessage("test");
+ Thread.sleep(1000);
+
+ final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics();
+ final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination(
+ new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
+
+ waitForConsumerCount(destinationStatistics, 1);
+
+ includedProducer.send(test);
+
+ waitForDispatchFromLocalBroker(destinationStatistics, 1);
+ assertLocalBrokerStatistics(destinationStatistics, 1);
+ assertEquals("remote dest messages", 1, remoteDestStatistics.getMessages().getCount());
+
+ //verify there are 2 virtual destinations but only 1 consumer and broker dest
+ assertAdvisoryBrokerCounts(2,1,1);
+ runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic1}, true);
+ Thread.sleep(2000);
+ //verify there is is only 1 virtual dest after deletion
+ assertAdvisoryBrokerCounts(1,1,1);
+
+ includedProducer.send(test);
+
+ //make sure messages are still forwarded even after 1 composite topic was deleted
+ waitForDispatchFromLocalBroker(destinationStatistics, 2);
+ assertLocalBrokerStatistics(destinationStatistics, 2);
+ assertEquals("remote dest messages", 2, remoteDestStatistics.getMessages().getCount());
+
+ }
+
+ /**
* Test that demand is destroyed after removing both targets from the composite Topic
* @throws Exception
*/
@@ -1375,7 +1428,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
ActiveMQMessage message = null;
while ((message = (ActiveMQMessage) advisoryConsumer.receive(1000)) != null) {
available++;
- LOG.debug("advisory data structure: {}", message.getDataStructure());
+ LOG.info("advisory data structure: {}", message.getDataStructure());
}
assertEquals(count, available);
}