You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2017/12/14 21:01:33 UTC
activemq git commit: AMQ-6875 - Use the correct destination for
Virtual destination consumers when using Virtual Topics
Repository: activemq
Updated Branches:
refs/heads/master d3e439378 -> 56baba96c
AMQ-6875 - Use the correct destination for Virtual destination consumers
when using Virtual Topics
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/56baba96
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/56baba96
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/56baba96
Branch: refs/heads/master
Commit: 56baba96c657d4e44b88955a964d6c92ff39b822
Parents: d3e4393
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Thu Dec 14 16:00:37 2017 -0500
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Dec 14 16:01:10 2017 -0500
----------------------------------------------------------------------
.../activemq/advisory/AdvisoryBroker.java | 46 +++++++++++++++++++-
.../network/VirtualConsumerDemandTest.java | 24 ++++++++--
2 files changed, 65 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/56baba96/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 1acd524..1508c61 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -17,6 +17,7 @@
package org.apache.activemq.advisory;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -42,6 +43,7 @@ import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
@@ -58,6 +60,7 @@ import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionId;
+import org.apache.activemq.filter.DestinationPath;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage;
@@ -604,7 +607,7 @@ public class AdvisoryBroker extends BrokerFilter {
if(brokerConsumerDests.putIfAbsent(pair, info) == null) {
LOG.debug("Virtual consumer pair added: {} for consumer: {} ", pair, info);
- info.setDestination(virtualDestination.getVirtualDestination());
+ setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest);
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
@@ -616,7 +619,7 @@ public class AdvisoryBroker extends BrokerFilter {
//this is the case of a real consumer coming online
} else {
info = info.copy();
- info.setDestination(virtualDestination.getVirtualDestination());
+ setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest);
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
@@ -626,6 +629,45 @@ public class AdvisoryBroker extends BrokerFilter {
}
}
+ /**
+ * Sets the virtual destination on the ConsumerInfo
+ * If this is a VirtualTopic then the destination used will be the actual topic subscribed
+ * to in order to track demand properly
+ *
+ * @param info
+ * @param virtualDestination
+ * @param activeMQDest
+ */
+ private void setConsumerInfoVirtualDest(ConsumerInfo info, VirtualDestination virtualDestination, ActiveMQDestination activeMQDest) {
+ info.setDestination(virtualDestination.getVirtualDestination());
+ if (virtualDestination instanceof VirtualTopic) {
+ VirtualTopic vt = (VirtualTopic) virtualDestination;
+ String prefix = vt.getPrefix() != null ? vt.getPrefix() : "";
+ String postfix = vt.getPostfix() != null ? vt.getPostfix() : "";
+ if (prefix.endsWith(".")) {
+ prefix = prefix.substring(0, prefix.length() - 1);
+ }
+ if (postfix.startsWith(".")) {
+ postfix = postfix.substring(1, postfix.length());
+ }
+ ActiveMQDestination prefixDestination = prefix.length() > 0 ? new ActiveMQTopic(prefix) : null;
+ ActiveMQDestination postfixDestination = postfix.length() > 0 ? new ActiveMQTopic(postfix) : null;
+
+ String[] prefixPaths = prefixDestination != null ? prefixDestination.getDestinationPaths() : new String[] {};
+ String[] activeMQDestPaths = activeMQDest.getDestinationPaths();
+ String[] postfixPaths = postfixDestination != null ? postfixDestination.getDestinationPaths() : new String[] {};
+
+ //sanity check
+ if (activeMQDestPaths.length > prefixPaths.length + postfixPaths.length) {
+ String[] topicPath = Arrays.copyOfRange(activeMQDestPaths, 0 + prefixPaths.length,
+ activeMQDestPaths.length - postfixPaths.length);
+
+ ActiveMQTopic newTopic = new ActiveMQTopic(DestinationPath.toString(topicPath));
+ info.setDestination(newTopic);
+ }
+ }
+ }
+
@Override
public void virtualDestinationRemoved(ConnectionContext context,
VirtualDestination virtualDestination) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/56baba96/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
index 782f53f..af5c316 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
@@ -118,31 +118,49 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
* @throws Exception
*/
@Test(timeout = 60 * 1000)
- public void testVirtualTopic() throws Exception {
+ public void testVirtualTopics() throws Exception {
Assume.assumeTrue(isUseVirtualDestSubsOnCreation);
doSetUp(true, null);
MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer("VirtualTopic.>");
MessageProducer includedProducer = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar"));
+ MessageProducer includedProducer2 = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar2"));
+ MessageProducer includedProducer3 = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar3"));
Thread.sleep(2000);
Message test = localSession.createTextMessage("test");
final DestinationStatistics destinationStatistics = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar")).getDestinationStatistics();
+ final DestinationStatistics destinationStatistics2 = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar2")).getDestinationStatistics();
+
+ //No queue destination on the remote side so should not forward
+ final DestinationStatistics destinationStatistics3 = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar3")).getDestinationStatistics();
//this will create the destination so messages accumulate
final DestinationStatistics remoteStats = remoteBroker.getDestination(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar")).getDestinationStatistics();
+ final DestinationStatistics remoteStats2 = remoteBroker.getDestination(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar2")).getDestinationStatistics();
+
waitForConsumerCount(destinationStatistics, 1);
+ waitForConsumerCount(destinationStatistics2, 1);
includedProducer.send(test);
+ includedProducer2.send(localSession.createTextMessage("test2"));
+ includedProducer3.send(localSession.createTextMessage("test3"));
//assert statistics
waitForDispatchFromLocalBroker(destinationStatistics, 1);
+ waitForDispatchFromLocalBroker(destinationStatistics2, 1);
assertLocalBrokerStatistics(destinationStatistics, 1);
+ assertLocalBrokerStatistics(destinationStatistics2, 1);
assertEquals("remote dest messages", 1, remoteStats.getMessages().getCount());
+ assertEquals("remote dest messages", 1, remoteStats2.getMessages().getCount());
- assertRemoteAdvisoryCount(advisoryConsumer, 1);
- assertAdvisoryBrokerCounts(1,1,1);
+ assertRemoteAdvisoryCount(advisoryConsumer, 2);
+ assertAdvisoryBrokerCounts(1,2,2);
+
+ //should not have forwarded for 3rd topic
+ Thread.sleep(1000);
+ assertEquals("local broker dest stat dispatched", 0, destinationStatistics3.getDispatched().getCount());
}